Skip to content

Commit

Permalink
call OnFinish even if finish is true
Browse files Browse the repository at this point in the history
Signed-off-by: gengliqi <[email protected]>
  • Loading branch information
gengliqi committed Jul 1, 2022
1 parent f3ef670 commit 858f851
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions dbms/src/DataStreams/ParallelInputsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,33 +231,30 @@ class ParallelInputsProcessor

handler.onFinishThread(thread_num);

if (additional_inputs_at_end.empty())
{
if (0 == --active_threads)
{
handler.onFinish();
}
}

{
std::unique_lock lock(running_first_mutex);
if (0 == --running_first)
{
/// Only one thread can go here so don't need to hold `unprepared_inputs_mutex`
/// or `unprepared_inputs_mutex` lock.
if (finish)
{
return;
}
else if (additional_inputs_at_end.empty())
{
handler.onFinish();
return;
}

assert(unprepared_inputs.empty() && available_inputs.empty());
/// If a error happens, the `unprepared_inputs` and `available_inputs` may not be empty.
unprepared_inputs = UnpreparedInputs{};
available_inputs = AvailableInputs{};
for (size_t i = 0; i < additional_inputs_at_end.size(); ++i)
unprepared_inputs.emplace(additional_inputs_at_end[i], i);

wait_first_done.notify_all();
}
else
{
if (additional_inputs_at_end.empty()) {
return;
}
wait_first_done.wait(lock, [this] {
return running_first == 0;
});
Expand Down

0 comments on commit 858f851

Please sign in to comment.