Vince's CSV Parser
Loading...
Searching...
No Matches
thread_safe_deque.hpp
Go to the documentation of this file.
1
8#pragma once
9#include <atomic>
10#include <condition_variable>
11#include <deque>
12#include <mutex>
13
14namespace csv {
15 namespace internals {
24 template<typename T>
26 public:
27 ThreadSafeDeque(size_t notify_size = 100) : _notify_size(notify_size) {}
29 this->data = other.data;
30 this->_notify_size = other._notify_size;
31 this->_is_empty.store(other._is_empty.load(std::memory_order_acquire), std::memory_order_release);
32 }
33
34 ThreadSafeDeque(const std::deque<T>& source) : ThreadSafeDeque() {
35 this->data = source;
36 this->_is_empty.store(source.empty(), std::memory_order_release);
37 }
38
39 bool empty() const noexcept {
40 return this->_is_empty.load(std::memory_order_acquire);
41 }
42
43 T& front() noexcept {
44 std::lock_guard<std::mutex> lock{ this->_lock };
45 return this->data.front();
46 }
47
53 T& operator[](size_t n) {
54 return this->data[n];
55 }
56
57 void push_back(T&& item) {
58 std::lock_guard<std::mutex> lock{ this->_lock };
59 this->data.push_back(std::move(item));
60 this->_is_empty.store(false, std::memory_order_release);
61
62 if (this->data.size() >= _notify_size) {
63 this->_cond.notify_all();
64 }
65 }
66
67 T pop_front() noexcept {
68 std::lock_guard<std::mutex> lock{ this->_lock };
69 T item = std::move(data.front());
70 data.pop_front();
71
72 // Update empty flag if we just emptied the deque
73 if (this->data.empty()) {
74 this->_is_empty.store(true, std::memory_order_release);
75 }
76
77 return item;
78 }
79
81 constexpr bool is_waitable() const noexcept { return this->_is_waitable; }
82
84 void wait() {
85 if (!is_waitable()) {
86 return;
87 }
88
89 std::unique_lock<std::mutex> lock{ this->_lock };
90 this->_cond.wait(lock, [this] { return this->data.size() >= _notify_size || !this->is_waitable(); });
91 lock.unlock();
92 }
93
94 size_t size() const noexcept {
95 std::lock_guard<std::mutex> lock{ this->_lock };
96 return this->data.size();
97 }
98
99 typename std::deque<T>::iterator begin() noexcept {
100 return this->data.begin();
101 }
102
103 typename std::deque<T>::iterator end() noexcept {
104 return this->data.end();
105 }
106
108 void notify_all() {
109 this->_is_waitable.store(true, std::memory_order_release);
110 this->_cond.notify_all();
111 }
112
114 void kill_all() {
115 this->_is_waitable.store(false, std::memory_order_release);
116 this->_cond.notify_all();
117 }
118
119 private:
120 std::atomic<bool> _is_empty{ true }; // Lock-free empty() check
121 std::atomic<bool> _is_waitable{ false }; // Lock-free is_waitable() check
122 size_t _notify_size;
123 mutable std::mutex _lock;
124 std::condition_variable _cond;
125 std::deque<T> data;
126 };
127 }
128}
A std::deque wrapper which allows multiple read and write threads to concurrently access it along wit...
void wait()
Wait for an item to become available.
void notify_all()
Tell listeners that this deque is actively being pushed to.
constexpr bool is_waitable() const noexcept
Returns true if a thread is actively pushing items to this deque.
void kill_all()
Tell all listeners to stop.
T & operator[](size_t n)
NOTE: operator[] is not synchronized.
CSV_CONST CONSTEXPR_17 OutArray arrayToDefault(T &&value)
Helper constexpr function to initialize an array with all the elements set to value.
The all encompassing namespace.