Vince's CSV Parser
Loading...
Searching...
No Matches
thread_safe_deque.hpp
Go to the documentation of this file.
1
11#pragma once
12#include <atomic>
13#include <condition_variable>
14#include <deque>
15#include <mutex>
16
17namespace csv {
18 namespace internals {
27 template<typename T>
29 public:
30 ThreadSafeDeque(size_t notify_size = 100) : _notify_size(notify_size) {}
32 this->data = other.data;
33 this->_notify_size = other._notify_size;
34 this->_is_empty.store(other._is_empty.load(std::memory_order_acquire), std::memory_order_release);
35 }
36
37 ThreadSafeDeque(const std::deque<T>& source) : ThreadSafeDeque() {
38 this->data = source;
39 this->_is_empty.store(source.empty(), std::memory_order_release);
40 }
41
42 bool empty() const noexcept {
43 return this->_is_empty.load(std::memory_order_acquire);
44 }
45
46 T& front() noexcept {
47 std::lock_guard<std::mutex> lock{ this->_lock };
48 return this->data.front();
49 }
50
56 T& operator[](size_t n) {
57 return this->data[n];
58 }
59
60 void push_back(T&& item) {
61 std::lock_guard<std::mutex> lock{ this->_lock };
62 this->data.push_back(std::move(item));
63 this->_is_empty.store(false, std::memory_order_release);
64
65 if (this->data.size() >= _notify_size) {
66 this->_cond.notify_all();
67 }
68 }
69
70 T pop_front() noexcept {
71 std::lock_guard<std::mutex> lock{ this->_lock };
72 T item = std::move(data.front());
73 data.pop_front();
74
75 // Update empty flag if we just emptied the deque
76 if (this->data.empty()) {
77 this->_is_empty.store(true, std::memory_order_release);
78 }
79
80 return item;
81 }
82
84 constexpr bool is_waitable() const noexcept { return this->_is_waitable; }
85
87 void wait() {
88 if (!is_waitable()) {
89 return;
90 }
91
92 std::unique_lock<std::mutex> lock{ this->_lock };
93 this->_cond.wait(lock, [this] { return this->data.size() >= _notify_size || !this->is_waitable(); });
94 lock.unlock();
95 }
96
97 size_t size() const noexcept {
98 std::lock_guard<std::mutex> lock{ this->_lock };
99 return this->data.size();
100 }
101
102 typename std::deque<T>::iterator begin() noexcept {
103 return this->data.begin();
104 }
105
106 typename std::deque<T>::iterator end() noexcept {
107 return this->data.end();
108 }
109
111 void notify_all() {
112 std::lock_guard<std::mutex> lock{ this->_lock };
113 this->_is_waitable.store(true, std::memory_order_release);
114 this->_cond.notify_all();
115 }
116
118 void kill_all() {
119 std::lock_guard<std::mutex> lock{ this->_lock };
120 this->_is_waitable.store(false, std::memory_order_release);
121 this->_cond.notify_all();
122 }
123
124 private:
125 std::atomic<bool> _is_empty{ true }; // Lock-free empty() check
126 std::atomic<bool> _is_waitable{ false }; // Lock-free is_waitable() check
127 size_t _notify_size;
128 mutable std::mutex _lock;
129 std::condition_variable _cond;
130 std::deque<T> data;
131 };
132 }
133}
A std::deque wrapper which allows multiple read and write threads to concurrently access it along wit...
void wait()
Wait for an item to become available.
void notify_all()
Tell listeners that this deque is actively being pushed to.
constexpr bool is_waitable() const noexcept
Returns true if a thread is actively pushing items to this deque.
void kill_all()
Tell all listeners to stop.
T & operator[](size_t n)
NOTE: operator[] is not synchronized.
CSV_CONST CONSTEXPR_17 OutArray arrayToDefault(T &&value)
Helper constexpr function to initialize an array with all the elements set to value.
The all encompassing namespace.