Vince's CSV Parser
Loading...
Searching...
No Matches
csv_reader.cpp
Go to the documentation of this file.
1
5#include "csv_reader.hpp"
6
7namespace csv {
8 CSV_INLINE void CSVReader::init_parser(
9 std::unique_ptr<internals::IBasicCSVParser> parser_impl
10 ) {
11 auto resolved = parser_impl->get_resolved_format();
12 this->_format = resolved.format;
13 this->_chunk_size = this->_format.get_chunk_size();
14 this->n_cols = resolved.n_cols;
15
16 if (!this->_format.col_names.empty()) {
17 this->set_col_names(this->_format.col_names);
18 }
19
20 this->parser = std::move(parser_impl);
21 this->initial_read();
22 }
23
26 CSVFormat new_format = this->_format;
27
28 // Since users are normally not allowed to set
29 // column names and header row simulatenously,
30 // we will set the backing variables directly here
31 new_format.col_names = this->col_names->get_col_names();
32 new_format.header = this->_format.header;
33
34 return new_format;
35 }
36
38 CSV_INLINE std::vector<std::string> CSVReader::get_col_names() const {
39 return (this->col_names) ? this->col_names->get_col_names() :
40 std::vector<std::string>();
41 }
42
47 return this->col_names->index_of(col_name);
48 }
49
50 CSV_INLINE void CSVReader::trim_header() {
51 if (!this->header_trimmed) {
52 for (int i = 0; i <= this->_format.header && !this->records->empty(); i++) {
53 if (i == this->_format.header && this->col_names->empty()) {
54 this->set_col_names(this->records->pop_front());
55 }
56 else {
57 this->records->pop_front();
58 }
59 }
60
61 this->header_trimmed = true;
62 }
63 }
64
66 CSV_INLINE void CSVReader::set_col_names(const std::vector<std::string>& names)
67 {
68 this->col_names->set_policy(this->_format.get_column_name_policy());
69 this->col_names->set_col_names(names);
70 this->n_cols = names.size();
71 }
72
82 CSV_INLINE bool CSVReader::read_csv(size_t bytes) {
83 // WORKER THREAD FUNCTION: Runs asynchronously to read CSV chunks
84 //
85 // Threading model:
86 // 1. notify_all() - signals read_row() that worker is active
87 // 2. parser->next() - reads and parses bytes (10MB chunks)
88 // 3. kill_all() - signals read_row() that worker is done
89 //
90 // Exception handling: Exceptions thrown here MUST propagate to the calling
91 // thread via std::exception_ptr. Bug #282 fixed cases where exceptions were
92 // swallowed, causing std::terminate() instead of proper error handling.
93
94 // Tell read_row() to listen for CSV rows
95 this->records->notify_all();
96
97 try {
98 this->parser->set_output(*this->records);
99 this->parser->next(bytes);
100
101 if (!this->header_trimmed) {
102 this->trim_header();
103 }
104 }
105 catch (...) {
106 // Never allow exceptions to escape the worker thread, or std::terminate will be invoked.
107 // Store the exception and rethrow from the consumer thread (read_row / iterator).
108 this->set_read_csv_exception(std::current_exception());
109 }
110
111 // Tell read_row() to stop waiting
112 this->records->kill_all();
113
114 return true;
115 }
116
131 while (true) {
132 if (this->records->empty()) {
133#if CSV_ENABLE_THREADS
134 if (this->records->is_waitable()) {
135 // Reading thread is currently active => wait for it to populate records
136 this->records->wait();
137 continue;
138 }
139#endif
140
141 // Reading thread is not active
142 JOIN_WORKER(this->read_csv_worker);
143
144 // If the worker thread failed, rethrow the error here
145 this->rethrow_read_csv_exception_if_any();
146
147 if (this->parser->eof())
148 // End of file and no more records
149 return false;
150
151 // Detect infinite loop: a previous read was requested but records are still empty.
152 // This fires when a single row spans more than 2 × _chunk_size bytes:
153 // - chunk N fills without finding '\n' → _read_requested set to true
154 // - chunk N+1 also fills without '\n' → guard fires here
155 // Default _chunk_size is CSV_CHUNK_SIZE_DEFAULT (10 MB), so the threshold is
156 // rows > 20 MB. Use CSVFormat::chunk_size() to raise the limit.
157 if (this->_read_requested && this->records->empty()) {
158 throw std::runtime_error(
159 "End of file not reached and no more records parsed. "
160 "This likely indicates a CSV row larger than the chunk size of " +
161 std::to_string(this->_chunk_size) + " bytes. "
162 "Use CSVFormat::chunk_size() to increase the chunk size."
163 );
164 }
165
166#if CSV_ENABLE_THREADS
167 // Start another reading thread.
168 // Mark as waitable before starting the thread to avoid a race where
169 // read_row() observes is_waitable()==false immediately after thread creation.
170 this->records->notify_all();
171 this->read_csv_worker = std::thread(&CSVReader::read_csv, this, this->_chunk_size);
172#else
173 // Single-threaded mode parses synchronously on the caller thread.
174 this->read_csv(this->_chunk_size);
175 this->rethrow_read_csv_exception_if_any();
176#endif
177 this->_read_requested = true;
178 continue;
179 }
180 else {
181 const auto policy = this->_format.variable_column_policy;
182 const size_t next_row_size = this->records->front().size();
183
184 if (policy == VariableColumnPolicy::KEEP_NON_EMPTY && next_row_size == 0) {
185 this->records->pop_front();
186 continue;
187 }
188
189 if (next_row_size != this->n_cols &&
190 (policy == VariableColumnPolicy::THROW || policy == VariableColumnPolicy::IGNORE_ROW)) {
191 auto errored_row = this->records->pop_front();
192
193 if (policy == VariableColumnPolicy::THROW) {
194 if (errored_row.size() < this->n_cols)
195 throw std::runtime_error("Line too short " + std::string(errored_row.raw_str()));
196
197 throw std::runtime_error("Line too long " + std::string(errored_row.raw_str()));
198 }
199
200 continue;
201 }
202
203 row = this->records->pop_front();
204 this->_n_rows++;
205 this->_read_requested = false; // Reset flag on successful read
206 return true;
207 }
208 }
209
210 return false;
211 }
212}
Stores information about how to parse a CSV file.
CSVFormat get_format() const
Return the format of the original raw CSV.
int index_of(csv::string_view col_name) const
Return the index of the column name if found or csv::CSV_NOT_FOUND otherwise.
bool read_row(CSVRow &row)
Retrieve rows as CSVRow objects, returning true if more rows are available.
std::vector< std::string > get_col_names() const
Return the CSV's column names as a vector of strings.
Data structure for representing CSV rows.
Definition csv_row.hpp:264
#define CSV_INLINE
Helper macro which should be #defined as "inline" in the single header version.
Definition common.hpp:26
Defines functionality needed for basic CSV parsing.
std::unique_ptr< RowCollection > records
Queue of parsed CSV rows.
size_t _n_rows
How many rows (minus header) have been read so far.
bool read_csv(size_t bytes=internals::CSV_CHUNK_SIZE_DEFAULT)
Read a chunk of CSV data.
internals::ColNamesPtr col_names
Pointer to a object containing column information.
void set_col_names(const std::vector< std::string > &)
Sets this reader's column names and associated data.
std::unique_ptr< internals::IBasicCSVParser > parser
Helper class which actually does the parsing.
size_t n_cols
The number of columns in this CSV.
The all encompassing namespace.
nonstd::string_view string_view
The string_view class used by this library.
Definition common.hpp:135