Vince's CSV Parser
Loading...
Searching...
No Matches
thread_safe_deque.hpp
Go to the documentation of this file.
1
11#pragma once
12
13#include <atomic>
14#include <condition_variable>
15#include <deque>
16#include <mutex>
17#include <utility>
18#include <vector>
19
20#include "row_queue_batch.hpp"
21
22namespace csv {
23 namespace internals {
34 template<typename T>
36 public:
37 ThreadSafeDeque(size_t notify_size = 100) : _notify_size(notify_size) {}
38
39 ThreadSafeDeque(const ThreadSafeDeque& other) {
40 std::lock_guard<std::mutex> lock{ other._lock };
41 this->batches_ = other.batches_;
42 this->front_index_ = other.front_index_;
43 this->size_ = other.size_;
44 this->_notify_size = other._notify_size;
45 this->_is_empty.store(other._is_empty.load(std::memory_order_acquire), std::memory_order_release);
46 this->_is_waitable.store(other._is_waitable.load(std::memory_order_acquire), std::memory_order_release);
47 }
48
49 ThreadSafeDeque(const std::deque<T>& source) : ThreadSafeDeque() {
50 std::vector<T> rows;
51 rows.reserve(source.size());
52 for (const auto& row : source) {
53 rows.push_back(row);
54 }
55 if (!rows.empty()) {
56 this->batches_.push_back(std::move(rows));
57 this->size_ = source.size();
58 }
59 this->_is_empty.store(source.empty(), std::memory_order_release);
60 }
61
62 bool empty() const noexcept {
63 return this->_is_empty.load(std::memory_order_acquire);
64 }
65
66 void push_back(T&& item) {
67 std::lock_guard<std::mutex> lock{ this->_lock };
68 std::vector<T> batch;
69 batch.push_back(std::move(item));
70 this->batches_.push_back(std::move(batch));
71 this->size_++;
72 this->_is_empty.store(false, std::memory_order_release);
73
74 if (this->size_ >= _notify_size) {
75 this->_cond.notify_all();
76 }
77 }
78
79 void append_rows(std::vector<T>&& rows) {
80 if (rows.empty()) {
81 return;
82 }
83
84 std::lock_guard<std::mutex> lock{ this->_lock };
85 this->size_ += rows.size();
86 this->batches_.push_back(std::move(rows));
87 this->_is_empty.store(false, std::memory_order_release);
88
89 if (this->size_ >= _notify_size) {
90 this->_cond.notify_all();
91 }
92 }
93
94 T pop_front() noexcept {
95 std::lock_guard<std::mutex> lock{ this->_lock };
96 T item = std::move(this->batches_.front()[this->front_index_]);
97 this->front_index_++;
98 this->size_--;
99 this->discard_exhausted_front_batch();
100
101 if (this->size_ == 0) {
102 this->_is_empty.store(true, std::memory_order_release);
103 }
104
105 return item;
106 }
107
115 size_t drain_front(std::vector<T>& out, size_t max_items) {
116 std::lock_guard<std::mutex> lock{ this->_lock };
117 const size_t drain_count = drain_front_batches(
118 this->batches_,
119 this->front_index_,
120 this->size_,
121 out,
122 max_items
123 );
124
125 if (this->size_ == 0) {
126 this->_is_empty.store(true, std::memory_order_release);
127 }
128
129 return drain_count;
130 }
131
140 template<typename Callback>
141 void inspect(Callback&& callback) const {
142 std::vector<T> snapshot;
143 {
144 std::lock_guard<std::mutex> lock{ this->_lock };
145 snapshot.reserve(this->size_);
146
147 bool first_batch = true;
148 for (const auto& batch : this->batches_) {
149 const size_t start = first_batch ? this->front_index_ : 0;
150 first_batch = false;
151
152 for (size_t i = start; i < batch.size(); ++i) {
153 snapshot.push_back(batch[i]);
154 }
155 }
156 }
157
158 std::forward<Callback>(callback)(snapshot);
159 }
160
162 bool is_waitable() const noexcept {
163 return this->_is_waitable.load(std::memory_order_acquire);
164 }
165
166 void wait() {
167 if (!is_waitable()) {
168 return;
169 }
170
171 std::unique_lock<std::mutex> lock{ this->_lock };
172 this->_cond.wait(lock, [this] { return this->size_ >= _notify_size || !this->is_waitable(); });
173 lock.unlock();
174 }
175
176 size_t size() const noexcept {
177 std::lock_guard<std::mutex> lock{ this->_lock };
178 return this->size_;
179 }
180
182 void notify_all() {
183 std::lock_guard<std::mutex> lock{ this->_lock };
184 this->_is_waitable.store(true, std::memory_order_release);
185 this->_cond.notify_all();
186 }
187
188 void kill_all() {
189 std::lock_guard<std::mutex> lock{ this->_lock };
190 this->_is_waitable.store(false, std::memory_order_release);
191 this->_cond.notify_all();
192 }
193
194 private:
195 std::atomic<bool> _is_empty{ true }; // Lock-free empty() check
196 std::atomic<bool> _is_waitable{ false }; // Lock-free is_waitable() check
197 size_t _notify_size;
198 mutable std::mutex _lock;
199 std::condition_variable _cond;
200 std::deque<std::vector<T>> batches_;
201 size_t front_index_ = 0;
202 size_t size_ = 0;
203
204 void discard_exhausted_front_batch() noexcept {
205 while (!this->batches_.empty() && this->front_index_ >= this->batches_.front().size()) {
206 this->batches_.pop_front();
207 this->front_index_ = 0;
208 }
209 }
210 };
211 }
212}
A std::deque wrapper which allows multiple read and write threads to concurrently access it along wit...
void inspect(Callback &&callback) const
Invoke callback with a synchronized copy of queued rows.
void notify_all()
Tell listeners that this deque is actively being pushed to.
size_t drain_front(std::vector< T > &out, size_t max_items)
Move up to max_items rows into a caller-owned batch buffer under one lock.
bool is_waitable() const noexcept
Returns true if a thread is actively pushing items to this deque.
The all encompassing namespace.
Shared helpers for batch-backed row queues.