Vince's CSV Parser
Loading...
Searching...
No Matches
csv_reader.cpp
Go to the documentation of this file.
1
5#include "csv_reader.hpp"
6
7namespace csv {
8#ifdef _MSC_VER
9#pragma region Reading helpers
10#endif
11 CSV_INLINE bool CSVReader::check_for_rows() {
12 if (!this->records->empty()) return true;
13
14 if (this->read_scheduler_.wait_if_active(
15 [this] { return this->records->is_waitable(); },
16 [this] { this->records->wait(); }
17 )) {
18 return true;
19 }
20
21 this->read_scheduler_.join();
22 this->read_scheduler_.rethrow_exception_if_any();
23
24 if (this->parser->eof()) return false;
25
26 if (this->_read_requested && this->records->empty()) {
27 internals::throw_row_too_large_for_chunk(this->_chunk_size);
28 }
29
30 this->read_scheduler_.run(
31 [this] { this->read_csv(this->_chunk_size); },
32 [this] { this->records->notify_all(); }
33 );
34 this->read_scheduler_.rethrow_exception_if_any();
35 this->_read_requested = true;
36 return true;
37 }
38
39#ifdef _MSC_VER
40#pragma endregion Reading helpers
41#endif
42
43#ifdef _MSC_VER
44#pragma region Format and header helpers
45#endif
46 CSV_INLINE void CSVReader::init_parser(
47 std::unique_ptr<internals::parser::CSVParserDriverBase> parser_impl
48 ) {
49 auto resolved = parser_impl->get_resolved_format();
50 this->_format = resolved.format;
51 this->_chunk_size = this->_format.get_chunk_size();
52 this->n_cols = resolved.n_cols;
53
54 if (!this->_format.col_names.empty()) {
55 this->set_col_names(this->_format.col_names);
56 }
57
58 this->parser = std::move(parser_impl);
59 this->initial_read();
60 }
61
63 CSVFormat new_format = this->_format;
64
65 // Since users are normally not allowed to set
66 // column names and header row simulatenously,
67 // we will set the backing variables directly here
68 new_format.col_names = this->col_names->get_col_names();
69 new_format.header = this->_format.header;
70
71 return new_format;
72 }
73
74 CSV_INLINE void CSVReader::trim_header() {
75 if (!this->header_trimmed) {
76 for (int i = 0; i <= this->_format.header && !this->records->empty(); i++) {
77 if (i == this->_format.header && this->col_names->empty()) {
78 this->set_col_names(this->records->pop_front());
79 }
80 else {
81 this->records->pop_front();
82 }
83 }
84
85 this->header_trimmed = true;
86 }
87 }
88
90 CSV_INLINE void CSVReader::set_col_names(const std::vector<std::string>& names)
91 {
92 this->col_names->set_policy(this->_format.get_column_name_policy());
93 this->col_names->set_col_names(names);
94 this->n_cols = names.size();
95 }
96#ifdef _MSC_VER
97#pragma endregion Format and header helpers
98#endif
99
100#ifdef _MSC_VER
101#pragma region Reading helpers
102#endif
103 CSV_INLINE bool CSVReader::accept_row(CSVRow&& candidate, CSVRow* single_row, std::vector<CSVRow>* batch_rows) {
104 const auto policy = this->_format.variable_column_policy;
105 const size_t next_row_size = candidate.size();
106
107 if (policy == VariableColumnPolicy::KEEP_NON_EMPTY && next_row_size == 0) {
108 return false;
109 }
110
111 if (next_row_size != this->n_cols &&
112 (policy == VariableColumnPolicy::THROW || policy == VariableColumnPolicy::IGNORE_ROW)) {
113 if (policy == VariableColumnPolicy::THROW) {
114 if (candidate.size() < this->n_cols) {
115 internals::throw_line_too_short(candidate.raw_str());
116 }
117
118 internals::throw_line_too_long(candidate.raw_str());
119 }
120
121 return false;
122 }
123
124 if (single_row != nullptr) {
125 *single_row = std::move(candidate);
126 } else if (batch_rows != nullptr) {
127 batch_rows->push_back(std::move(candidate));
128 } else {
129 return false;
130 }
131
132 this->_n_rows++;
133 this->_read_requested = false;
134 return true;
135 }
136
137 CSV_INLINE void CSVReader::drain_rows_into_chunk(std::vector<CSVRow>& out, size_t max_rows) {
138 std::vector<CSVRow> drained;
139 drained.reserve(max_rows - out.size());
140 this->records->drain_front(drained, max_rows - out.size());
141
142 for (size_t i = 0; i < drained.size(); ++i) {
143 this->accept_row(std::move(drained[i]), nullptr, &out);
144 }
145 }
146#ifdef _MSC_VER
147#pragma endregion Reading helpers
148#endif
149
150#ifdef _MSC_VER
151#pragma region Worker reading methods
152#endif
153
164 CSV_INLINE bool CSVReader::read_csv(size_t bytes) {
165 // SCHEDULED READ FUNCTION: Runs asynchronously when runtime threading
166 // is enabled, or synchronously when CSVFormat::threading(false) is active.
167 //
168 // Threading model:
169 // 1. notify_all() - signals read_row() that worker is active
170 // 2. parser->next() - reads and parses bytes (10MB chunks)
171 // 3. kill_all() - signals read_row() that worker is done
172 //
173 // Exception handling: CSVReadScheduler catches exceptions and rethrows
174 // them on the consumer thread. Bug #282 fixed cases where worker
175 // exceptions were swallowed, causing std::terminate().
176
177 // Tell read_row() to listen for CSV rows
178 this->records->notify_all();
179
180 try {
181 this->parser->set_output(*this->records);
182 this->parser->next(bytes);
183
184 if (!this->header_trimmed) {
185 this->trim_header();
186 }
187 }
188 catch (...) {
189 this->records->kill_all();
190 throw;
191 }
192
193 // Tell read_row() to stop waiting
194 this->records->kill_all();
195
196 return true;
197 }
198
200 while (this->check_for_rows()) {
201 if (this->records->empty())
202 continue;
203
204 if (this->accept_row(this->records->pop_front(), &row, nullptr))
205 return true;
206 }
207
208 return false;
209 }
210
211 CSV_INLINE bool CSVReader::read_chunk(std::vector<CSVRow>& out, size_t max_rows) {
212 out.clear();
213
214 if (max_rows == 0) {
215 return false;
216 }
217
218 while (out.size() < max_rows) {
219 if (check_for_rows()) {
220 if (this->records->empty()) {
221 continue;
222 }
223
224 const size_t before_size = out.size();
225 this->drain_rows_into_chunk(out, max_rows);
226
227 if (out.size() == before_size) {
228 continue;
229 }
230 }
231 else return !out.empty();
232 }
233
234 return true;
235 }
236#ifdef _MSC_VER
237#pragma endregion Worker reading methods
238#endif
239}
Stores information about how to parse a CSV file.
bool read_chunk(std::vector< CSVRow > &out, size_t max_rows)
Read up to max_rows rows into a caller-owned batch buffer.
CSVFormat get_format() const
Return the resolved parsing format for this CSV source.
bool read_row(CSVRow &row)
Retrieve the next CSV row, returning true while more rows are available.
Data structure for representing CSV rows.
Definition csv_row.hpp:544
#define CSV_INLINE
Helper macro which should be #defined as "inline" in the single header version.
Definition common.hpp:31
Defines functionality needed for basic CSV parsing.
std::unique_ptr< RowCollection > records
Queue of parsed CSV rows.
size_t _n_rows
How many rows (minus header) have been read so far.
bool read_csv(size_t bytes=internals::CSV_CHUNK_SIZE_DEFAULT)
Read a chunk of CSV data.
internals::ColNamesPtr col_names
Pointer to a object containing column information.
void set_col_names(const std::vector< std::string > &)
Sets this reader's column names and associated data.
std::unique_ptr< internals::parser::CSVParserDriverBase > parser
Helper class which actually does the parsing.
size_t n_cols
The number of columns in this CSV.
The all encompassing namespace.