Skip to content

Commit

Permalink
[contrib/pzstd] Prevent hangs when there are errors
Browse files Browse the repository at this point in the history
When two threads are using a WorkQueue and the reader thread exits due
to an error, it must call WorkQueue::finish() to wake up the writer
thread. Otherwise, if the queue is full and the writer thread is waiting
for a free slot, it could hang forever.

This can happen in pratice when decompressing a large, corrupted file
that does not contain pzstd skippable frames.
  • Loading branch information
yotann authored and terrelln committed Jan 13, 2025
1 parent a610550 commit 80af41e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
12 changes: 10 additions & 2 deletions contrib/pzstd/Pzstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@ static void compress(
std::shared_ptr<BufferWorkQueue> out,
size_t maxInputSize) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
auto guard = makeScopeGuard([&] {
in->finish();
out->finish();
});
// Initialize the CCtx
auto ctx = state.cStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
Expand Down Expand Up @@ -431,7 +434,10 @@ static void decompress(
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out) {
auto& errorHolder = state.errorHolder;
auto guard = makeScopeGuard([&] { out->finish(); });
auto guard = makeScopeGuard([&] {
in->finish();
out->finish();
});
// Initialize the DCtx
auto ctx = state.dStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
Expand Down Expand Up @@ -578,13 +584,15 @@ std::uint64_t writeFile(
FILE* outputFd,
bool decompress) {
auto& errorHolder = state.errorHolder;
auto outsFinishGuard = makeScopeGuard([&outs] { outs.finish(); });
auto lineClearGuard = makeScopeGuard([&state] {
state.log.clear(kLogInfo);
});
std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out;
// Grab the output queue for each decompression job (in order).
while (outs.pop(out)) {
auto outFinishGuard = makeScopeGuard([&out] { out->finish(); });
if (errorHolder.hasError()) {
continue;
}
Expand Down
7 changes: 4 additions & 3 deletions contrib/pzstd/utils/WorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,14 @@ class WorkQueue {
}

/**
* Promise that `push()` won't be called again, so once the queue is empty
* there will never any more work.
* Promise that either the reader side or the writer side is done.
* If the writer is done, `push()` won't be called again, so once the queue
* is empty there will never be any more work. If the reader is done, `pop()`
* won't be called again, so further items pushed will just be ignored.
*/
void finish() {
{
std::lock_guard<std::mutex> lock(mutex_);
assert(!done_);
done_ = true;
}
readerCv_.notify_all();
Expand Down

0 comments on commit 80af41e

Please sign in to comment.