Skip to content

Commit

Permalink
[Bug](join) fix broadcast join running when hash table build not fini…
Browse files Browse the repository at this point in the history
…shed (apache#37643)

## Proposed changes
before when PipelineTask close, will be set sink operator always ready.
but not all sink could be running normal, like some instance of join
which not build hash table,
it's need to wait until others build hash table finished and then shared
from it.


```
F20240710 17:29:09.628299 221449 hashjoin_build_sink.cpp:582] Check failed: _shared_hash_table_context->signaled
 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /mnt/disk2/zhangsida/doris/be/src/common/signal_handler.h:421
 1# 0x00007FEF9BF64B50 in /lib64/libc.so.6
 2# gsignal in /lib64/libc.so.6
 3# __GI_abort in /lib64/libc.so.6
 4# 0x0000559C8BD8BE8D in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
 5# 0x0000559C8BD7E52A in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
 6# google::LogMessage::SendToLog() in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
 7# google::LogMessage::Flush() in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
 8# google::LogMessageFatal::~LogMessageFatal() in /mnt/disk2/zhangsida/doris/output/be/lib/doris_be
 9# doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*, doris::vectorized::Block*, bool) at /mnt/disk2/zhangsida/doris/be/src/pipeline/exec/hashjoin_build_sink.cpp:582
10# doris::pipeline::PipelineTask::execute(bool*)::$_1::operator()() const at /mnt/disk2/zhangsida/doris/be/src/pipeline/pipeline_task.cpp:361
11# doris::pipeline::PipelineTask::execute(bool*) at /mnt/disk2/zhangsida/doris/be/src/pipeline/pipeline_task.cpp:364
12# doris::pipeline::TaskScheduler::_do_work(unsigned long) at /mnt/disk2/zhangsida/doris/be/src/pipeline/task_scheduler.cpp:138
13# doris::pipeline::TaskScheduler::start()::$_0::operator()() const at /mnt/disk2/zhangsida/doris/be/src/pipeline/task_scheduler.cpp:64

```
  • Loading branch information
zhangstar333 authored Jul 15, 2024
1 parent ceead84 commit 49e123c
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 5 deletions.
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,11 @@ Status AnalyticLocalState::close(RuntimeState* state) {

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,6 @@ Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
_closed = true;
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions regression-test/data/query_p0/join/test_join6.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_hash_join --

-- !select_hash_join2 --

Loading

0 comments on commit 49e123c

Please sign in to comment.