37 ThreadSafeDeque(
size_t notify_size = 100) : _notify_size(notify_size) {}
40 std::lock_guard<std::mutex> lock{ other._lock };
41 this->batches_ = other.batches_;
42 this->front_index_ = other.front_index_;
43 this->size_ = other.size_;
44 this->_notify_size = other._notify_size;
45 this->_is_empty.store(other._is_empty.load(std::memory_order_acquire), std::memory_order_release);
46 this->_is_waitable.store(other._is_waitable.load(std::memory_order_acquire), std::memory_order_release);
51 rows.reserve(source.size());
52 for (
const auto& row : source) {
56 this->batches_.push_back(std::move(rows));
57 this->size_ = source.size();
59 this->_is_empty.store(source.empty(), std::memory_order_release);
62 bool empty()
const noexcept {
63 return this->_is_empty.load(std::memory_order_acquire);
66 void push_back(T&& item) {
67 std::lock_guard<std::mutex> lock{ this->_lock };
69 batch.push_back(std::move(item));
70 this->batches_.push_back(std::move(batch));
72 this->_is_empty.store(
false, std::memory_order_release);
74 if (this->size_ >= _notify_size) {
75 this->_cond.notify_all();
79 void append_rows(std::vector<T>&& rows) {
84 std::lock_guard<std::mutex> lock{ this->_lock };
85 this->size_ += rows.size();
86 this->batches_.push_back(std::move(rows));
87 this->_is_empty.store(
false, std::memory_order_release);
89 if (this->size_ >= _notify_size) {
90 this->_cond.notify_all();
94 T pop_front()
noexcept {
95 std::lock_guard<std::mutex> lock{ this->_lock };
96 T item = std::move(this->batches_.front()[this->front_index_]);
99 this->discard_exhausted_front_batch();
101 if (this->size_ == 0) {
102 this->_is_empty.store(
true, std::memory_order_release);
116 std::lock_guard<std::mutex> lock{ this->_lock };
117 const size_t drain_count = drain_front_batches(
125 if (this->size_ == 0) {
126 this->_is_empty.store(
true, std::memory_order_release);
140 template<
typename Callback>
142 std::vector<T> snapshot;
144 std::lock_guard<std::mutex> lock{ this->_lock };
145 snapshot.reserve(this->size_);
147 bool first_batch =
true;
148 for (
const auto& batch : this->batches_) {
149 const size_t start = first_batch ? this->front_index_ : 0;
152 for (
size_t i = start; i < batch.size(); ++i) {
153 snapshot.push_back(batch[i]);
158 std::forward<Callback>(callback)(snapshot);
163 return this->_is_waitable.load(std::memory_order_acquire);
171 std::unique_lock<std::mutex> lock{ this->_lock };
172 this->_cond.wait(lock, [
this] {
return this->size_ >= _notify_size || !this->
is_waitable(); });
176 size_t size() const noexcept {
177 std::lock_guard<std::mutex> lock{ this->_lock };
183 std::lock_guard<std::mutex> lock{ this->_lock };
184 this->_is_waitable.store(
true, std::memory_order_release);
185 this->_cond.notify_all();
189 std::lock_guard<std::mutex> lock{ this->_lock };
190 this->_is_waitable.store(
false, std::memory_order_release);
191 this->_cond.notify_all();
195 std::atomic<bool> _is_empty{
true };
196 std::atomic<bool> _is_waitable{
false };
198 mutable std::mutex _lock;
199 std::condition_variable _cond;
200 std::deque<std::vector<T>> batches_;
201 size_t front_index_ = 0;
204 void discard_exhausted_front_batch() noexcept {
205 while (!this->batches_.empty() && this->front_index_ >= this->batches_.front().size()) {
206 this->batches_.pop_front();
207 this->front_index_ = 0;