|
Vince's CSV Parser
|
ThreadSafeDeque implements a producer-consumer queue used by the CSV parser to communicate between the worker thread (producer) and the main thread (consumer). This document explains the synchronization protocol, critical invariants, and the exact sequence of operations required for correct concurrency.
Before diving into lock ordering, define the two key queue signals in parser terms:
is_waitable() == true: A CSV parsing worker is currently active for this read cycle. More rows may still be pushed, so the consumer is allowed to wait.is_waitable() == false: The current worker has finished pushing rows for this cycle and has published a terminal notification (kill_all()).Important nuance:
is_waitable() == false does not always mean global end-of-file by itself.CSVReader::read_row() may start another worker for the next chunk when parser EOF has not been reached yet.is_waitable() == false also implies no more CSV rows remain to be parsed.The producer is intentionally not always running.
| Step | Consumer (read_row) | Producer (read_csv worker) | Shared State / Note |
|---|---|---|---|
| 1 | empty() check | Fast-path queue visibility via _is_empty | |
| 2 | is_waitable() check | If true, consumer may block; if false, consumer should not wait | |
| 3 | wait() | Blocks until data.size() >= notify_size or !is_waitable() | |
| 4 | push_back(row) | Producer appends rows and sets _is_empty = false | |
| 5 | size-based notify_all() (when size >= notify_size) | Batch wake-up for throughput | |
| 6 | kill_all() at end of worker cycle | Sets _is_waitable = false and terminal-notifies waiters | |
| 7 | pop_front() | Consumer drains rows after wake-up |
Location: thread_safe_deque.hpp
Why atomic: Cheap lockfree check to avoid lock contention when parser has already finished.
Invariant: If is_waitable() == false, the producer will never push() again and a prior notify_all() was already called.
BEFORE PR 298 (BUGGY):
Race Timeline (Small CSV, < 100 rows):
| Time | Consumer Thread | Producer Thread |
|---|---|---|
| T0 | records->empty() == true | |
| T1 | records->is_waitable() == true (atomic read) | |
| T2 | kill_all() executes: | |
| T3 | _is_waitable.store(false) | |
| T4 | _cond.notify_all() – SIGNAL SENT | |
| T5 | ENTERS wait() acquiring lock | |
| T6 | _cond.wait() blocks forever | |
| DEADLOCK | Waiting for notification that was already sent | Thread exits |
Root Cause: The notification in step T4 is sent before the consumer enters wait() in step T5. The condition variable predicate was already true (!is_waitable()), but the consumer missed the signal.
AFTER PR 298 (FIXED):
Corrected Timeline:
| Time | Consumer Thread | Producer Thread |
|---|---|---|
| T0 | records->empty() == true | |
| T1 | records->is_waitable() == true (atomic read) | |
| T2 | kill_all() attempts to acquire _lock | |
| T3 | **_lock ACQUIRED** | |
| T4 | _is_waitable.store(false) | |
| T5 | std::unique_lock<> lock{_lock} – BLOCKS | |
| T6 | _cond.notify_all() | |
| T7 | **_lock RELEASED** | |
| T8 | LOCK ACQUIRED | |
| T9 | Predicate: is_waitable() == false → true! | |
| T10 | wake() returns immediately | |
| CORRECT | Consumer wakes and exits wait |
Key Insight: The mutex serializes the state transition and notification with the consumer's lock acquisition. The consumer cannot enter wait() until the producer has either:
**_is_empty (atomic):**
empty() is called millions of times per parse**_is_waitable (atomic):**
wait() avoids lock when producer is done**_lock + _cond (mutex + condition variable):**
notify_all() while holding lockThe atomic store has memory ordering but does not prevent the lost-wakeup window. Atomics guarantee visibility, not synchronization with condition variables.
Location: thread_safe_deque.hpp
Why batching matters:
push_back() would wake the consumerSmall file edge case:
kill_all() notification allows consumer to wakeCSV: 2 rows, stream source
_is_waitable = false), consumer will wake.if (!is_waitable()) is safe without lock.empty() returns true iff deque is empty.| Component | Where | What |
|---|---|---|
| Consumer | csv_reader.cpp | read_row() flow on main thread |
| Consumer | csv_reader.cpp | Empty/is_waitable checks and wait() call |
| Producer | csv_reader.cpp | read_csv() worker lifecycle |
| Producer | csv_reader.cpp | notify_all() at cycle start |
| Producer | csv_reader.cpp | parser->next() push cycle |
| Producer | csv_reader.cpp | kill_all() terminal signal |
| Queue | thread_safe_deque.hpp | push_back() producer path |
| Queue | thread_safe_deque.hpp | pop_front() consumer path |
| Queue | thread_safe_deque.hpp | wait() condition protocol |
| Queue | thread_safe_deque.hpp | notify_all() wake-up state |
| Queue | thread_safe_deque.hpp | kill_all() terminal publication |
See test_threadsafe_deque_race.cpp for regression tests that specifically target:
The test helper timeout_helper.hpp wraps stress tests with explicit 10-second timeouts.