5#include <condition_variable>
14#include <unordered_map>
23 template<
typename KeyType>
25 template<
typename KeyType>
26 class DataFrameColumn;
27 template<
typename KeyType>
36 busy.clear(std::memory_order_release);
41 values = std::move(other.values);
42 busy.clear(std::memory_order_release);
47 bool try_get_copy(
size_t col_index, std::string& out)
const {
48 row_overlay_lock_guard lock(
this);
49 auto it = values.find(col_index);
50 if (it == values.end()) {
67 row_overlay_lock_guard lock(
this);
68 auto it = values.find(col_index);
69 if (it == values.end()) {
77 void set(
size_t col_index, std::string value) {
78 row_overlay_lock_guard lock(
this);
79 values[col_index] = std::move(value);
83 row_overlay_lock_guard lock(
this);
84 return values.empty();
88 struct row_overlay_lock_guard {
89 explicit row_overlay_lock_guard(
const RowOverlay* overlay)
90 : busy(const_cast<std::atomic_flag&>(overlay->busy)) {
91 while (busy.test_and_set(std::memory_order_acquire)) {}
94 ~row_overlay_lock_guard() {
95 busy.clear(std::memory_order_release);
98 std::atomic_flag& busy;
101 mutable std::atomic_flag busy = ATOMIC_FLAG_INIT;
102 std::unordered_map<size_t, std::string> values;
112 owned(std::move(other.owned)) {
114 ptr.store(overlay, std::memory_order_release);
115 other.ptr.store(
nullptr, std::memory_order_release);
119 if (
this != &other) {
120 owned = std::move(other.owned);
122 ptr.store(overlay, std::memory_order_release);
123 other.ptr.store(
nullptr, std::memory_order_release);
130 return ptr.load(std::memory_order_acquire);
134 return ptr.load(std::memory_order_acquire);
141 overlay = owned.get();
142 ptr.store(overlay, std::memory_order_release);
149 std::atomic<RowOverlay*> ptr;
150 std::unique_ptr<RowOverlay> owned;
153 namespace internals {
154 template<
typename Owner,
typename Proxy,
typename Accessor>
155 class indexed_proxy_iterator {
157 using value_type = Proxy;
158 using difference_type = std::ptrdiff_t;
159 using pointer =
const Proxy*;
160 using reference =
const Proxy&;
161 using iterator_category = std::random_access_iterator_tag;
163 indexed_proxy_iterator() =
default;
164 indexed_proxy_iterator(Owner* owner,
size_t index, Accessor accessor = Accessor())
165 : owner_(owner), index_(index), accessor_(accessor) {}
167 reference operator*()
const {
168 cached_proxy_ = accessor_(owner_, index_);
169 return cached_proxy_;
172 pointer operator->()
const {
174 return &cached_proxy_;
177 indexed_proxy_iterator& operator++() { ++index_;
return *
this; }
178 indexed_proxy_iterator operator++(
int) {
auto tmp = *
this; ++index_;
return tmp; }
179 indexed_proxy_iterator& operator--() { --index_;
return *
this; }
180 indexed_proxy_iterator operator--(
int) {
auto tmp = *
this; --index_;
return tmp; }
182 indexed_proxy_iterator operator+(difference_type n)
const {
183 return indexed_proxy_iterator(owner_,
static_cast<size_t>(index_ + n), accessor_);
186 indexed_proxy_iterator operator-(difference_type n)
const {
187 return indexed_proxy_iterator(owner_,
static_cast<size_t>(index_ - n), accessor_);
190 difference_type operator-(
const indexed_proxy_iterator& other)
const {
191 return static_cast<difference_type
>(index_) -
static_cast<difference_type
>(other.index_);
194 bool operator==(
const indexed_proxy_iterator& other)
const {
195 return owner_ == other.owner_ && index_ == other.index_;
198 bool operator!=(
const indexed_proxy_iterator& other)
const {
199 return !(*
this == other);
203 Owner* owner_ =
nullptr;
206 mutable Proxy cached_proxy_;
223 this->duplicate_key_policy = value;
228 return this->duplicate_key_policy;
231 DataFrameOptions& set_key_column(
const std::string& value) {
232 this->key_column = value;
236 const std::string& get_key_column()
const {
237 return this->key_column;
240 DataFrameOptions& set_throw_on_missing_key(
bool value) {
241 this->throw_on_missing_key = value;
245 bool get_throw_on_missing_key()
const {
246 return this->throw_on_missing_key;
250 std::string key_column;
255 bool throw_on_missing_key =
true;
262 this->start_workers(worker_count);
269 this->stop_workers();
272 size_t worker_count()
const noexcept {
273#if CSV_ENABLE_THREADS
274 return workers_.size();
280 template<
typename Fn>
281 void parallel_for(
size_t task_count, Fn&& fn) {
282 if (task_count == 0) {
286#if CSV_ENABLE_THREADS
287 if (workers_.empty() || task_count <= workers_.size()) {
288 this->run_serial(task_count, std::forward<Fn>(fn));
292 std::exception_ptr captured_exception;
294 std::unique_lock<std::mutex> lock(mutex_);
295 current_task_ = std::forward<Fn>(fn);
296 task_exception_ =
nullptr;
298 task_count_ = task_count;
299 active_workers_ = workers_.size();
303 task_ready_.notify_all();
305 std::unique_lock<std::mutex> lock(mutex_);
306 task_done_.wait(lock, [
this]() {
307 return completed_generation_ == generation_;
310 captured_exception = task_exception_;
311 current_task_ = std::function<void(size_t)>();
312 task_exception_ =
nullptr;
314 if (captured_exception) {
315 std::rethrow_exception(captured_exception);
318 this->run_serial(task_count, std::forward<Fn>(fn));
323 template<
typename Fn>
324 void run_serial(
size_t task_count, Fn&& fn) {
325 for (
size_t i = 0; i < task_count; ++i) {
330 static size_t default_worker_count() {
331#if CSV_ENABLE_THREADS
332 const unsigned int hw = std::thread::hardware_concurrency();
333 return hw > 0 ?
static_cast<size_t>(hw) : 1;
339#if CSV_ENABLE_THREADS
340 void start_workers(
size_t worker_count) {
341 workers_.reserve(worker_count);
342 for (
size_t i = 0; i < worker_count; ++i) {
343 workers_.push_back(std::thread(&DataFrameExecutor::worker_loop,
this));
347 void stop_workers() {
349 std::lock_guard<std::mutex> lock(mutex_);
353 task_ready_.notify_all();
354 for (
auto& worker : workers_) {
355 if (worker.joinable())
361 size_t seen_generation = 0;
362 std::unique_lock<std::mutex> lock(mutex_);
365 task_ready_.wait(lock, [
this, seen_generation]() {
366 return stop_ || generation_ != seen_generation;
371 const size_t local_generation = generation_;
372 seen_generation = local_generation;
376 const size_t task_index = next_task_.fetch_add(1);
377 if (task_index >= task_count_)
381 current_task_(task_index);
385 if (!task_exception_) {
386 task_exception_ = std::current_exception();
387 next_task_.store(task_count_);
395 if (--active_workers_ == 0) {
396 completed_generation_ = local_generation;
397 task_done_.notify_one();
402 std::vector<std::thread> workers_;
404 std::condition_variable task_ready_;
405 std::condition_variable task_done_;
406 std::function<void(
size_t)> current_task_;
407 std::exception_ptr task_exception_ =
nullptr;
408 std::atomic<size_t> next_task_{0};
409 size_t task_count_ = 0;
410 size_t active_workers_ = 0;
411 size_t generation_ = 0;
412 size_t completed_generation_ = 0;
415 void start_workers(
size_t) {}
416 void stop_workers() {}
437 row_overlay(other.row_overlay),
438 col_index(other.col_index),
439 can_mutate(other.can_mutate),
440 owned_value_(other.owned_value_) {
441 this->refresh_value();
447 row_overlay(other.row_overlay),
448 col_index(other.col_index),
449 can_mutate(other.can_mutate),
450 owned_value_(std::move(other.owned_value_)) {
451 this->refresh_value();
460 row_overlay(_row_overlay),
461 col_index(_col_index),
463 this->refresh_value();
472 row_overlay(_row_overlay),
473 col_index(_col_index),
475 this->refresh_value();
479 if (
this != &other) {
481 row_overlay = other.row_overlay;
482 col_index = other.col_index;
483 can_mutate = other.can_mutate;
484 owned_value_ = other.owned_value_;
485 this->refresh_value();
492 if (
this != &other) {
494 row_overlay = other.row_overlay;
495 col_index = other.col_index;
496 can_mutate = other.can_mutate;
497 owned_value_ = std::move(other.owned_value_);
498 this->refresh_value();
505 return this->assign(std::string(value));
509 template<
typename T = std::
string>
511 return const_cast<DataFrameCell*
>(
this)->CSVField::template get<T>();
514 bool is_null() const noexcept {
518 bool is_str() const noexcept {
522 bool is_num() const noexcept {
526 bool is_int() const noexcept {
530 bool is_float() const noexcept {
539 bool try_get(T& out)
const noexcept {
540 return const_cast<DataFrameCell*
>(
this)->CSVField::template try_get<T>(out);
544 void refresh_value() {
550 if (row_overlay && row_overlay->try_get_copy(col_index, owned_value_)) {
555 owned_value_.clear();
559 DataFrameCell& assign(std::string stored) {
560 if (!can_mutate || !row_overlay) {
561 throw std::runtime_error(internals::ERROR_CANNOT_EDIT_CONST_DF_CELL);
564 owned_value_ = stored;
565 const_cast<RowOverlay*
>(row_overlay)->set(col_index, std::move(stored));
571 const RowOverlay* row_overlay;
574 std::string owned_value_;
581 template<
typename KeyType>
585 DataFrameRow() : row(nullptr), frame(nullptr), row_index(0), row_overlay(nullptr), key_ptr(nullptr), can_mutate(false) {}
590 DataFrame<KeyType>* _frame,
594 ) : row(_row), frame(_frame), row_index(_row_index), row_overlay(_edits), key_ptr(_key), can_mutate(true) {}
599 const DataFrame<KeyType>* _frame,
603 ) : row(_row), frame(_frame), row_index(_row_index), row_overlay(_edits), key_ptr(_key), can_mutate(false) {}
607 return this->make_cell(this->find_column(col));
612 return this->make_cell(n);
617 return this->make_cell(this->find_column(col));
622 return this->make_cell(n);
638 const KeyType&
key()
const {
return *key_ptr; }
645 if (!can_mutate || !frame) {
646 throw std::runtime_error(internals::ERROR_CANNOT_ERASE_CONST_DF_ROW);
649 return const_cast<DataFrame<KeyType>*
>(frame)->erase_at_index(row_index);
653 operator std::vector<std::string>()
const {
654 std::vector<std::string> result;
655 result.reserve(row->
size());
657 for (
size_t i = 0; i < row->
size(); i++) {
658 result.push_back(this->make_cell(i).
template get<std::string>());
664 std::string
to_json(
const std::vector<std::string>& subset = {})
const {
665 const field_string_accessor field_at(
this);
667 return this->get_frame_json_converter().row_to_json(this->
size(), field_at, subset);
670 return this->make_detached_json_converter().row_to_json(this->
size(), field_at, subset);
674 std::string
to_json_array(
const std::vector<std::string>& subset = {})
const {
675 const field_string_accessor field_at(
this);
677 return this->get_frame_json_converter().row_to_json_array(this->
size(), field_at, subset);
680 return this->make_detached_json_converter().row_to_json_array(this->
size(), field_at, subset);
690 return std::views::iota(
size_t{0}, this->
size())
691 | std::views::transform([
this](
size_t i) {
return this->make_cell(i).template
get<std::string>(); });
696 struct field_string_accessor {
697 explicit field_string_accessor(
const DataFrameRow* owner) : owner(owner) {}
699 std::string operator()(
size_t i)
const {
706 const internals::JsonConverter& get_frame_json_converter()
const {
707 return frame->json_converter_.get_or_create([
this]() {
708 return std::make_shared<internals::JsonConverter>(frame->
columns());
712 internals::JsonConverter make_detached_json_converter()
const {
716 DataFrameCell make_cell(
size_t col_index) {
718 ? DataFrameCell(row,
const_cast<RowOverlay*
>(row_overlay), col_index)
719 : DataFrameCell(row, row_overlay, col_index);
722 DataFrameCell make_cell(
size_t col_index)
const {
723 return DataFrameCell(row, row_overlay, col_index);
726 size_t find_column(
const std::string& col)
const {
728 return frame->find_column(col);
731 const internals::ConstColNamesPtr col_names = row->
col_names_ptr();
732 const int position = col_names->index_of(col);
734 internals::throw_column_not_found_out_of_range(col);
737 return static_cast<size_t>(position);
741 const DataFrame<KeyType>* frame;
743 const RowOverlay* row_overlay;
744 const KeyType* key_ptr;
749 template<
typename KeyType>
753 DataFrameCell operator()(
const DataFrameColumn<KeyType>* owner,
size_t row_index)
const {
754 return owner->operator[](row_index);
759 using const_iterator = iterator;
764 : frame_(frame), col_index_(col_index) {}
767 const std::string&
name()
const {
768 return (*frame_->col_names_)[col_index_];
778 return frame_->
size();
783 return this->
size() == 0;
788 const auto& row = frame_->rows.at(row_index);
789 const auto* row_edits = frame_->find_row_edits(row_index);
800 const auto& row = frame_->rows.at(row_index);
801 const auto* row_edits = frame_->find_row_edits(row_index);
803 if (row_edits && row_edits->try_get_view(col_index_, edited_value)) {
811 template<
typename T = std::
string>
813 std::vector<T> values;
814 values.reserve(this->
size());
816 for (
size_t row_index = 0; row_index < this->
size(); ++row_index) {
817 values.push_back((*
this)[row_index].template get<T>());
824 operator std::vector<std::string>()
const {
825 return this->to_vector<std::string>();
834 return std::views::iota(
size_t{0}, this->
size())
835 | std::views::transform([
this](
size_t row_index) {
842 iterator
begin()
const {
return iterator(
this, 0); }
843 iterator end()
const {
return iterator(
this, this->
size()); }
844 const_iterator cbegin()
const {
return const_iterator(
this, 0); }
845 const_iterator cend()
const {
return const_iterator(
this, this->
size()); }
848 const DataFrame<KeyType>* frame_;
852 template<
typename KeyType = std::
string>
857 using row_type = DataFrameRow<KeyType>;
861 DataFrameRow<KeyType> operator()(DataFrame<KeyType>* owner,
size_t row_index)
const {
862 return owner->make_row_proxy(row_index);
867 DataFrameRow<KeyType> operator()(
const DataFrame<KeyType>* owner,
size_t row_index)
const {
868 return owner->make_const_row_proxy(row_index);
880 "DataFrame<KeyType> requires KeyType to be hashable (std::hash<KeyType> specialization required)."
885 "DataFrame<KeyType> requires KeyType to be equality comparable (operator== required)."
889 std::is_default_constructible<KeyType>::value,
890 "DataFrame<KeyType> requires KeyType to be default-constructible."
903 this->init_unkeyed_from_reader(reader);
908 this->init_unkeyed_from_rows(rows);
916 this->init_from_reader(reader, options);
929 this->init_from_reader(reader, options);
938 const std::string& _key_column,
940 bool throw_on_missing_key =
true
944 .set_key_column(_key_column)
945 .set_duplicate_key_policy(policy)
946 .set_throw_on_missing_key(throw_on_missing_key)
955 csv::enable_if_t<csv::is_invocable_returning<KeyFunc, KeyType, const CSVRow&>::value,
int> = 0
961 ) : col_names_(reader.col_names_ptr()) {
962 this->is_keyed =
true;
963 this->build_from_key_function(reader, key_func, policy);
969 csv::enable_if_t<csv::is_invocable_returning<KeyFunc, KeyType, const CSVRow&>::value,
int> = 0
975 ) :
DataFrame(reader, key_func, options.get_duplicate_key_policy()) {}
988 size_t n_rows() const noexcept {
return rows.size(); }
991 size_t n_cols() const noexcept {
return col_names_->size(); }
1000 return this->col_names_->index_of(name);
1004 const std::vector<std::string>&
columns() const noexcept {
return this->col_names_->get_col_names(); }
1012 if (include_rows.size() != this->rows.size()) {
1013 throw std::invalid_argument(
"selected row mask size must match DataFrame row count");
1016 std::vector<CSVRow> selected;
1017 selected.reserve(this->rows.size());
1018 for (
size_t row_index = 0; row_index < this->rows.size(); ++row_index) {
1019 if (include_rows[row_index]) {
1020 selected.push_back(this->rows[row_index]);
1029 if (col_index >= this->
n_cols()) {
1030 internals::throw_column_index_out_of_range();
1033 return DataFrameColumn<KeyType>(
this, col_index);
1038 return this->
column_view(this->find_column(name));
1050 template<
typename K = KeyType,
1051 csv::enable_if_t<!std::is_integral<K>::value,
int> = 0>
1053 static_assert(std::is_same<K, KeyType>::value,
1054 "Do not explicitly instantiate this template. Use at(size_t) for positional access.");
1060 template<
typename K = KeyType,
1061 csv::enable_if_t<!std::is_integral<K>::value,
int> = 0>
1063 static_assert(std::is_same<K, KeyType>::value,
1064 "Do not explicitly instantiate this template. Use at(size_t) for positional access.");
1073 DataFrameRow<KeyType>
at(
size_t i) {
1074 const auto& row = rows.at(i);
1075 auto* row_edits = this->ensure_row_edits(i);
1076 return DataFrameRow<KeyType>(&row,
this, i, row_edits, this->key_ptr_at(i));
1080 DataFrameRow<KeyType>
at(
size_t i)
const {
1081 const auto& row = rows.at(i);
1082 const RowOverlay* row_edits = this->find_row_edits(i);
1083 return DataFrameRow<KeyType>(&row,
this, i, row_edits, this->key_ptr_at(i));
1093 this->require_keyed_frame();
1094 auto position = this->position_of(key);
1095 return DataFrameRow<KeyType>(&rows.at(position),
this, position, this->ensure_row_edits(position), this->key_ptr_at(position));
1100 this->require_keyed_frame();
1101 auto position = this->position_of(key);
1102 const RowOverlay* row_edits = this->find_row_edits(position);
1103 return DataFrameRow<KeyType>(&rows.at(position),
this, position, row_edits, this->key_ptr_at(position));
1112 this->require_keyed_frame();
1113 this->ensure_key_index();
1114 return key_index->find(key) != key_index->end();
1124 template<
typename T = std::
string>
1125 std::vector<T>
column(
const std::string& name)
const {
1126 const size_t col_idx = this->find_column(name);
1127 std::vector<T> values;
1129 values.reserve(rows.size());
1130 for (
size_t row_index = 0; row_index < rows.size(); ++row_index) {
1131 values.push_back(this->
at(row_index)[col_idx].
template get<T>());
1150 template<
typename State,
typename Fn>
1153 std::vector<State>& states,
1156 if (states.size() != this->n_cols()) {
1157 throw std::invalid_argument(internals::ERROR_COLUMN_APPLY_STATE_COUNT);
1160 executor.parallel_for(this->
n_cols(), [
this, &states, &fn](
size_t column_index) {
1161 fn(this->
column_view(column_index), states[column_index]);
1173 template<
typename State,
typename Fn>
1176 const std::vector<size_t>& column_indices,
1177 std::vector<State>& states,
1180 if (states.size() != column_indices.size()) {
1181 throw std::invalid_argument(internals::ERROR_COLUMN_APPLY_SUBSET_STATE_COUNT);
1184 this->validate_selected_columns(column_indices);
1186 executor.parallel_for(column_indices.size(), [
this, &column_indices, &states, &fn](
size_t selected_index) {
1187 const size_t column_index = column_indices[selected_index];
1188 fn(this->column_view(column_index), states[selected_index]);
1198 template<
typename Fn>
1203 executor.parallel_for(this->
n_cols(), [
this, &fn](
size_t column_index) {
1215 template<
typename Fn>
1218 const std::vector<size_t>& column_indices,
1221 this->validate_selected_columns(column_indices);
1223 executor.parallel_for(column_indices.size(), [
this, &column_indices, &fn](
size_t selected_index) {
1224 fn(this->column_view(column_indices[selected_index]));
1235 typename GroupKey = invoke_result_t<GroupFunc, DataFrameRow<KeyType>>,
1242 std::unordered_map<GroupKey, std::vector<size_t>>
group_by(GroupFunc group_func)
const {
1243 std::unordered_map<GroupKey, std::vector<size_t>> grouped;
1245 for (
size_t i = 0; i < rows.size(); i++) {
1246 GroupKey group_key = group_func(this->
at(i));
1247 grouped[group_key].push_back(i);
1258 std::unordered_map<std::string, std::vector<size_t>>
group_by(
const std::string& name)
const {
1259 const size_t col_idx = this->find_column(name);
1260 std::unordered_map<std::string, std::vector<size_t>> grouped;
1262 for (
size_t i = 0; i < rows.size(); i++) {
1289 bool is_keyed =
false;
1292 internals::ConstColNamesPtr col_names_ = std::make_shared<internals::ColNames>();
1295 std::vector<CSVRow> rows;
1298 std::vector<KeyType> keys_;
1301 mutable std::unique_ptr<std::unordered_map<KeyType, size_t>> key_index;
1302 mutable internals::lazy_shared_ptr<internals::JsonConverter> json_converter_;
1309 std::vector<RowOverlaySlot> edits;
1310 std::shared_ptr<std::mutex> edits_creation_lock_{
new std::mutex() };
1313 void init_unkeyed_from_reader(CSVReader& reader) {
1314 this->assert_fresh_storage(
false);
1315 this->is_keyed =
false;
1316 this->col_names_ = reader.col_names_ptr();
1318 std::vector<CSVRow> batch;
1319 while (reader.read_chunk(batch, dataframe_read_chunk_rows())) {
1320 this->append_unkeyed_batch(batch);
1325 void init_unkeyed_from_rows(std::vector<CSVRow>& source_rows) {
1326 this->assert_fresh_storage(
false);
1327 this->is_keyed =
false;
1328 this->col_names_ = source_rows.empty()
1329 ? internals::ConstColNamesPtr(std::make_shared<internals::ColNames>())
1330 : source_rows.front().col_names_ptr();
1331 this->rows = std::move(source_rows);
1332 this->edits.resize(this->rows.size());
1336 void init_from_reader(CSVReader& reader,
const DataFrameOptions& options) {
1337 this->assert_fresh_storage(
false);
1338 this->is_keyed =
true;
1339 this->col_names_ = reader.col_names_ptr();
1340 const std::string key_column = options.get_key_column();
1342 if (key_column.empty())
1343 throw std::runtime_error(internals::ERROR_KEY_COLUMN_EMPTY);
1345 if (this->col_names_->index_of(key_column) ==
CSV_NOT_FOUND)
1346 throw std::runtime_error(std::string(internals::ERROR_KEY_COLUMN_NOT_FOUND) + key_column);
1348 const bool throw_on_missing_key = options.get_throw_on_missing_key();
1350 this->build_from_key_function(
1352 [key_column, throw_on_missing_key](
const CSVRow& row) -> KeyType {
1354 return row[key_column].template get<KeyType>();
1356 catch (
const std::exception& e) {
1357 if (throw_on_missing_key) {
1358 throw std::runtime_error(internals::ERROR_KEY_COLUMN_VALUE + std::string(e.what()));
1364 options.get_duplicate_key_policy()
1369 template<
typename KeyFunc>
1370 void build_from_key_function(
1373 DuplicateKeyPolicy policy
1375 std::unordered_map<KeyType, size_t> key_to_pos;
1376 this->assert_fresh_storage(
true);
1378 std::vector<CSVRow> batch;
1379 while (reader.read_chunk(batch, dataframe_read_chunk_rows())) {
1380 this->append_keyed_batch(batch, key_func, policy, key_to_pos);
1384 static size_t dataframe_read_chunk_rows() noexcept {
1388 template<
typename Container>
1389 static void reserve_for_append(Container& container,
size_t additional) {
1390 if (additional == 0) {
1394 const size_t required = container.size() + additional;
1395 if (required <= container.capacity()) {
1399 const size_t current = container.capacity();
1403 size_t next = current == 0 ? additional : current * 2;
1404 if (next < required || next < current) {
1408 container.reserve(next);
1411 void append_unkeyed_batch(std::vector<CSVRow>& batch) {
1412 reserve_for_append(rows, batch.size());
1413 reserve_for_append(edits, batch.size());
1415 for (
auto& row : batch) {
1416 rows.push_back(std::move(row));
1417 edits.emplace_back();
1421 template<
typename KeyFunc>
1422 void append_keyed_batch(
1423 std::vector<CSVRow>& batch,
1425 DuplicateKeyPolicy policy,
1426 std::unordered_map<KeyType, size_t>& key_to_pos
1428 reserve_for_append(rows, batch.size());
1429 reserve_for_append(keys_, batch.size());
1430 reserve_for_append(edits, batch.size());
1432 for (
auto& row : batch) {
1433 KeyType key = key_func(row);
1435 auto existing = key_to_pos.find(key);
1436 if (existing != key_to_pos.end()) {
1437 if (policy == DuplicateKeyPolicy::THROW)
1438 throw std::runtime_error(internals::ERROR_DUPLICATE_KEY);
1440 if (policy == DuplicateKeyPolicy::OVERWRITE)
1441 rows[existing->second] = std::move(row);
1446 rows.push_back(std::move(row));
1447 keys_.push_back(key);
1448 edits.emplace_back();
1449 key_to_pos[key] = rows.size() - 1;
1454 size_t find_column(
const std::string& name)
const {
1456 throw std::out_of_range(std::string(internals::ERROR_COLUMN_NOT_FOUND) + name);
1460 const RowOverlay* find_row_edits(
size_t row_index)
const {
1461 return edits.at(row_index).get();
1465 RowOverlay* ensure_row_edits(
size_t row_index) {
1466 RowOverlaySlot& slot = edits.at(row_index);
1467 RowOverlay* overlay = slot.get();
1472 std::lock_guard<std::mutex> lock(*edits_creation_lock_);
1473 return slot.ensure();
1476 DataFrameRow<KeyType> make_row_proxy(
size_t row_index) {
1477 const auto& row = rows.at(row_index);
1478 return DataFrameRow<KeyType>(&row,
this, row_index, this->ensure_row_edits(row_index), this->key_ptr_at(row_index));
1481 DataFrameRow<KeyType> make_const_row_proxy(
size_t row_index)
const {
1482 const auto& row = rows.at(row_index);
1483 return DataFrameRow<KeyType>(&row,
this, row_index, this->find_row_edits(row_index), this->key_ptr_at(row_index));
1486 void erase_row_edits(
size_t row_index) {
1487 if (row_index < edits.size()) {
1488 edits.erase(edits.begin() + row_index);
1492 bool erase_at_index(
size_t row_index) {
1493 if (row_index >= rows.size()) {
1497 this->erase_row_edits(row_index);
1498 rows.erase(rows.begin() + row_index);
1499 if (this->is_keyed) {
1500 keys_.erase(keys_.begin() + row_index);
1502 this->invalidate_key_index();
1506 void validate_selected_columns(
const std::vector<size_t>& column_indices)
const {
1507 for (
size_t column_index : column_indices) {
1508 if (column_index >= this->
n_cols()) {
1509 throw std::out_of_range(internals::ERROR_COLUMN_APPLY_INVALID_INDEX);
1515 void require_keyed_frame()
const {
1517 throw std::runtime_error(internals::ERROR_UNKEYED_DATA_FRAME);
1521 void invalidate_key_index() {
1526 void assert_fresh_storage(
bool expected_is_keyed)
const {
1527 CSV_DEBUG_ASSERT(this->rows.empty());
1528 CSV_DEBUG_ASSERT(this->keys_.empty());
1529 CSV_DEBUG_ASSERT(this->edits.empty());
1530 CSV_DEBUG_ASSERT(this->key_index.get() ==
nullptr);
1531 CSV_DEBUG_ASSERT(this->json_converter_.get() ==
nullptr);
1532 CSV_DEBUG_ASSERT(this->is_keyed == expected_is_keyed);
1536 void ensure_key_index()
const {
1537 if (key_index)
return;
1539 key_index = std::unique_ptr<std::unordered_map<KeyType, size_t>>(
1540 new std::unordered_map<KeyType, size_t>()
1543 for (
size_t i = 0; i < rows.size(); i++) {
1544 (*key_index)[keys_[i]] = i;
1549 size_t position_of(
const KeyType& key)
const {
1550 this->ensure_key_index();
1551 auto it = key_index->find(key);
1552 return it == key_index->end() ?
throw std::out_of_range(internals::ERROR_KEY_NOT_FOUND)
1556 const KeyType* key_ptr_at(
size_t row_index)
const {
1557 return this->is_keyed ? &keys_.at(row_index) :
nullptr;
1561 #ifdef CSV_HAS_CXX20
1563 internals::csv_write_rows_input_range<DataFrame<>>,
1564 "DataFrame must remain compatible with csv::DelimWriter::write_rows()."
Data type representing individual CSV values.
DataType type() noexcept
Return the type of the underlying CSV data.
bool is_str() noexcept
Returns true if field is a non-numeric, non-empty string.
constexpr CSVField(csv::string_view _sv) noexcept
Constructs a CSVField from a string_view.
T get()
Returns the value casted to the requested type, performing type checking before.
bool try_get(T &out) noexcept
Non-throwing equivalent of get().
bool is_null() noexcept
Returns true if field is an empty string or string of whitespace characters.
CONSTEXPR csv::string_view get_sv() const noexcept
Return a string view over the field's contents.
bool is_float() noexcept
Returns true if field is a floating point value.
bool is_num() noexcept
Returns true if field is an integer or float.
bool is_int() noexcept
Returns true if field is an integer.
Main class for parsing CSVs from files and in-memory sources.
Data structure for representing CSV rows.
CONSTEXPR bool empty() const noexcept
Indicates whether row is empty or not.
const std::vector< std::string > & get_col_names() const
Retrieve this row's associated column names.
CONSTEXPR size_t size() const noexcept
Return the number of fields in this row.
internals::ConstColNamesPtr col_names_ptr() const noexcept
Internal accessor for preserving resolved column-name lookup policy across helper types.
T get() const
Const-friendly read access for proxy use in column iteration and callbacks.
Lightweight non-owning view over one DataFrame column.
csv::string_view get_sv(size_t row_index) const
Access a visible cell value as a string_view without materializing a DataFrameCell.
DataFrameCell operator[](size_t row_index) const
Access a visible cell value by row index.
std::vector< T > to_vector() const
Materialize this column as a vector of converted values.
size_t index() const noexcept
Zero-based column position.
size_t size() const noexcept
Number of rows in the parent batch.
const std::string & name() const
Column name.
auto to_sv_range() const
Convert this DataFrameColumn into a std::ranges::input_range of strings.
bool empty() const noexcept
Whether the parent batch contains no rows.
iterator begin() const
Iterate over visible cells in this column.
Persistent execution backend for batch-oriented DataFrame column work.
Allows configuration of DataFrame behavior.
DuplicateKeyPolicy
Policy for handling duplicate keys when creating a keyed DataFrame.
Proxy class that wraps a CSVRow and intercepts field access to check for edits.
DataFrameCell operator[](const std::string &col)
Access a field by column name, preserving edit support.
std::string to_json(const std::vector< std::string > &subset={}) const
Convert to JSON.
DataFrameRow(const CSVRow *_row, const DataFrame< KeyType > *_frame, size_t _row_index, const RowOverlay *_edits, const KeyType *_key)
Construct a read-only DataFrameRow wrapper.
auto to_sv_range() const
Convert this DataFrameRow into a std::ranges::input_range of strings, respecting the sparse overlay (...
DataFrameRow(const CSVRow *_row, DataFrame< KeyType > *_frame, size_t _row_index, RowOverlay *_edits, const KeyType *_key)
Construct a mutable DataFrameRow wrapper.
DataFrameCell operator[](size_t n)
Access a field by position, preserving edit support.
DataFrameRow()
Default constructor (creates an unbound proxy).
const std::vector< std::string > & get_col_names() const
Get column names.
DataFrameCell operator[](size_t n) const
Access a field by position, checking edits first.
DataFrameCell operator[](const std::string &col) const
Access a field by column name, checking edits first.
std::string to_json_array(const std::vector< std::string > &subset={}) const
Convert to JSON array.
size_t size() const
Get the number of fields in the row.
const KeyType & key() const
Get the key for this row (only valid for keyed DataFrames).
bool empty() const
Check if the row is empty.
const CSVRow & get_underlying_row() const
Get the underlying CSVRow for compatibility.
bool erase()
Delete this row from the parent DataFrame.
bool empty() const noexcept
Check if the DataFrame is empty (has no rows).
DataFrame(CSVReader &reader, const std::string &_key_column, DuplicateKeyPolicy policy=DuplicateKeyPolicy::OVERWRITE, bool throw_on_missing_key=true)
Construct a keyed DataFrame using a column name as the key.
internals::indexed_proxy_iterator< const DataFrame< KeyType >, DataFrameRow< KeyType >, const_row_accessor > const_iterator
Row-wise const iterator over DataFrameRow entries.
DataFrameRow< KeyType > operator[](size_t i)
Access a row by position (unchecked).
DataFrame(CSVReader &reader, const DataFrameOptions &options)
Construct a keyed DataFrame from a CSV reader with options.
bool has_column(const std::string &name) const
Check if a column exists in the DataFrame.
iterator end()
Get iterator past the last row.
DataFrameRow< KeyType > operator[](const KeyType &key)
Access a row by its key.
const_iterator cend() const
Get const iterator past the last row (explicit).
std::vector< T > column(const std::string &name) const
Extract all values from a column with type conversion.
size_t n_cols() const noexcept
Get the number of columns in the DataFrame.
internals::indexed_proxy_iterator< DataFrame< KeyType >, DataFrameRow< KeyType >, mutable_row_accessor > iterator
Row-wise iterator over DataFrameRow entries.
std::unordered_map< GroupKey, std::vector< size_t > > group_by(GroupFunc group_func) const
Group row positions using an arbitrary grouping function.
iterator begin()
Get iterator to the first row.
DataFrame(CSVReader &reader, KeyFunc key_func, DuplicateKeyPolicy policy=DuplicateKeyPolicy::OVERWRITE)
Construct a keyed DataFrame using a custom key function.
DataFrame(csv::string_view filename, const DataFrameOptions &options, CSVFormat format=CSVFormat::guess_csv())
Construct a keyed DataFrame directly from a CSV file.
void column_parallel_apply(DataFrameExecutor &executor, const std::vector< size_t > &column_indices, std::vector< State > &states, Fn &&fn) const
Apply a batch-oriented function to a selected subset of columns, potentially in parallel.
DataFrame(CSVReader &reader, KeyFunc key_func, const DataFrameOptions &options)
Construct a keyed DataFrame using a custom key function with options.
DataFrame selected_rows(const std::vector< std::uint8_t > &include_rows) const
Build an unkeyed DataFrame containing rows whose corresponding mask entry is true.
size_t n_rows() const noexcept
Get the number of rows in the DataFrame.
const std::vector< std::string > & columns() const noexcept
Get the column names in order.
const_iterator cbegin() const
Get const iterator to the first row (explicit).
void column_parallel_apply(DataFrameExecutor &executor, std::vector< State > &states, Fn &&fn) const
Apply a batch-oriented function to each column, potentially in parallel.
DataFrameRow< KeyType > operator[](size_t i) const
Access a row by position (unchecked, const version).
void column_parallel_apply(DataFrameExecutor &executor, const std::vector< size_t > &column_indices, Fn &&fn) const
Apply a read-only batch function to a selected subset of columns, potentially in parallel.
const_iterator begin() const
Get const iterator to the first row.
DataFrameRow< KeyType > at(size_t i) const
Access a row by position with bounds checking (const version).
DataFrameRow< KeyType > operator[](const KeyType &key) const
Access a row by its key (const version).
DataFrameColumn< KeyType > column_view(const std::string &name) const
Access a column view by name.
bool contains(const KeyType &key) const
Check if a key exists in the DataFrame.
const_iterator end() const
Get const iterator past the last row.
int index_of(const std::string &name) const
Get the index of a column by name.
DataFrameRow< KeyType > at(size_t i)
Access a row by position with bounds checking.
void column_parallel_apply(DataFrameExecutor &executor, Fn &&fn) const
Apply a read-only batch function to each column, potentially in parallel.
DataFrame(std::vector< CSVRow > rows)
Construct an unkeyed DataFrame from an existing batch of rows.
size_t size() const noexcept
Get the number of rows in the DataFrame.
std::unordered_map< std::string, std::vector< size_t > > group_by(const std::string &name) const
Group row positions by the value of a column.
DataFrameColumn< KeyType > column_view(size_t col_index) const
Access a column view by position.
DataFrame()=default
Construct an empty DataFrame.
DataFrame(CSVReader &reader)
Construct an unkeyed DataFrame from a CSV reader.
Shared exception message templates and throw helpers.
Defines functionality needed for basic CSV parsing.
Internal JSON serialization helpers for row-like CSV data.
The all encompassing namespace.
DataType
Enumerates the different CSV field types recognized by this library.
CONSTEXPR_14 csv::string_view CSVField::get< csv::string_view >()
Retrieve a view over this field's string.
std::string CSVField::get< std::string >()
Retrieve this field's original string.
constexpr int CSV_NOT_FOUND
Integer indicating a requested column wasn't found.
std::string_view string_view
The string_view class used by this library.
bool try_get_view(size_t col_index, csv::string_view &out) const
Return a view into an edited cell without copying.