Vince's CSV Parser
Loading...
Searching...
No Matches
thread_safe_deque.hpp
Go to the documentation of this file.
1
11#pragma once
12
13#include <atomic>
14#include <condition_variable>
15#include <deque>
16#include <mutex>
17
18namespace csv {
19 namespace internals {
28 template<typename T>
30 public:
31 ThreadSafeDeque(size_t notify_size = 100) : _notify_size(notify_size) {}
32
33 ThreadSafeDeque(const ThreadSafeDeque& other) {
34 this->data = other.data;
35 this->_notify_size = other._notify_size;
36 this->_is_empty.store(other._is_empty.load(std::memory_order_acquire), std::memory_order_release);
37 }
38
39 ThreadSafeDeque(const std::deque<T>& source) : ThreadSafeDeque() {
40 this->data = source;
41 this->_is_empty.store(source.empty(), std::memory_order_release);
42 }
43
44 bool empty() const noexcept {
45 return this->_is_empty.load(std::memory_order_acquire);
46 }
47
48 T& front() noexcept {
49 std::lock_guard<std::mutex> lock{ this->_lock };
50 return this->data.front();
51 }
52
58 T& operator[](size_t n) {
59 return this->data[n];
60 }
61
62 void push_back(T&& item) {
63 std::lock_guard<std::mutex> lock{ this->_lock };
64 this->data.push_back(std::move(item));
65 this->_is_empty.store(false, std::memory_order_release);
66
67 if (this->data.size() >= _notify_size) {
68 this->_cond.notify_all();
69 }
70 }
71
72 T pop_front() noexcept {
73 std::lock_guard<std::mutex> lock{ this->_lock };
74 T item = std::move(data.front());
75 data.pop_front();
76
77 if (this->data.empty()) {
78 this->_is_empty.store(true, std::memory_order_release);
79 }
80
81 return item;
82 }
83
85 bool is_waitable() const noexcept {
86 return this->_is_waitable.load(std::memory_order_acquire);
87 }
88
89 void wait() {
90 if (!is_waitable()) {
91 return;
92 }
93
94 std::unique_lock<std::mutex> lock{ this->_lock };
95 this->_cond.wait(lock, [this] { return this->data.size() >= _notify_size || !this->is_waitable(); });
96 lock.unlock();
97 }
98
99 size_t size() const noexcept {
100 std::lock_guard<std::mutex> lock{ this->_lock };
101 return this->data.size();
102 }
103
104 typename std::deque<T>::iterator begin() noexcept {
105 return this->data.begin();
106 }
107
108 typename std::deque<T>::iterator end() noexcept {
109 return this->data.end();
110 }
111
113 void notify_all() {
114 std::lock_guard<std::mutex> lock{ this->_lock };
115 this->_is_waitable.store(true, std::memory_order_release);
116 this->_cond.notify_all();
117 }
118
119 void kill_all() {
120 std::lock_guard<std::mutex> lock{ this->_lock };
121 this->_is_waitable.store(false, std::memory_order_release);
122 this->_cond.notify_all();
123 }
124
125 private:
126 std::atomic<bool> _is_empty{ true }; // Lock-free empty() check
127 std::atomic<bool> _is_waitable{ false }; // Lock-free is_waitable() check
128 size_t _notify_size;
129 mutable std::mutex _lock;
130 std::condition_variable _cond;
131 std::deque<T> data;
132 };
133 }
134}
A std::deque wrapper which allows multiple read and write threads to concurrently access it along wit...
void notify_all()
Tell listeners that this deque is actively being pushed to.
T & operator[](size_t n)
NOTE: operator[] is not synchronized.
bool is_waitable() const noexcept
Returns true if a thread is actively pushing items to this deque.
The all encompassing namespace.