Skip to content

Commit

Permalink
Merge pull request #40542 from ClickHouse/backport/22.7/40070
Browse files Browse the repository at this point in the history
Backport #40070 to 22.7: WriteBufferFromS3 potential deadlock fix
  • Loading branch information
kitaisreal authored Aug 24, 2022
2 parents 9325a08 + 75a1aa4 commit 6f72dcf
Showing 1 changed file with 85 additions and 59 deletions.
144 changes: 85 additions & 59 deletions src/IO/WriteBufferFromS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ void WriteBufferFromS3::writePart()
if (schedule)
{
UploadPartTask * task = nullptr;

int part_number;
{
std::lock_guard lock(bg_tasks_mutex);
Expand All @@ -264,45 +265,57 @@ void WriteBufferFromS3::writePart()
part_number = num_added_bg_tasks;
}

fillUploadRequest(task->req, part_number);

if (file_segments_holder)
/// Notify waiting thread when task finished
auto task_finish_notify = [&, task]()
{
task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
std::lock_guard lock(bg_tasks_mutex);
task->is_finised = true;
++num_finished_bg_tasks;

/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};

schedule([this, task]()
try
{
try
{
processUploadRequest(*task);
}
catch (...)
{
task->exception = std::current_exception();
}
fillUploadRequest(task->req, part_number);

try
{
finalizeCacheIfNeeded(task->cache_files);
}
catch (...)
if (file_segments_holder)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}

schedule([this, task, task_finish_notify]()
{
std::lock_guard lock(bg_tasks_mutex);
task->is_finised = true;
++num_finished_bg_tasks;

/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
}
});
try
{
processUploadRequest(*task);
}
catch (...)
{
task->exception = std::current_exception();
}

try
{
finalizeCacheIfNeeded(task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}

task_finish_notify();
});
}
catch (...)
{
task_finish_notify();
throw;
}
}
else
{
Expand Down Expand Up @@ -397,43 +410,56 @@ void WriteBufferFromS3::makeSinglepartUpload()
{
put_object_task = std::make_unique<PutObjectTask>();

fillPutRequest(put_object_task->req);
if (file_segments_holder)
/// Notify waiting thread when put object task finished
auto task_notify_finish = [&]()
{
put_object_task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}
std::lock_guard lock(bg_tasks_mutex);
put_object_task->is_finised = true;

/// Notification under mutex is important here.
/// Othervies, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};

schedule([this]()
try
{
try
{
processPutRequest(*put_object_task);
}
catch (...)
{
put_object_task->exception = std::current_exception();
}
fillPutRequest(put_object_task->req);

try
if (file_segments_holder)
{
finalizeCacheIfNeeded(put_object_task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
put_object_task->cache_files.emplace(std::move(*file_segments_holder));
file_segments_holder.reset();
}

schedule([this, task_notify_finish]()
{
std::lock_guard lock(bg_tasks_mutex);
put_object_task->is_finised = true;
try
{
processPutRequest(*put_object_task);
}
catch (...)
{
put_object_task->exception = std::current_exception();
}

/// Notification under mutex is important here.
/// Othervies, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
}
});
try
{
finalizeCacheIfNeeded(put_object_task->cache_files);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}

task_notify_finish();
});
}
catch (...)
{
task_notify_finish();
throw;
}
}
else
{
Expand Down

0 comments on commit 6f72dcf

Please sign in to comment.