Skip to content

Commit

Permalink
[fix](shuffle) Fix remaining tasks if all tasks are running on single…
Browse files Browse the repository at this point in the history
… BE (#41350)
  • Loading branch information
Gabriel39 authored Sep 27, 2024
1 parent 2373456 commit 6edc6f8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
35 changes: 21 additions & 14 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);

register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
if (!only_local_exchange) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_finish_dependency->block();
}

if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = Dependency::create_shared(
Expand Down Expand Up @@ -244,7 +248,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
fmt::format("Crc32HashPartitioner({})", _partition_count));
}

_finish_dependency->block();
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
Expand Down Expand Up @@ -559,8 +562,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
final_st = st;
}
}
local_state._sink_buffer->set_should_stop();
return final_st;
if (local_state._sink_buffer) {
local_state._sink_buffer->set_should_stop();
}
}
return final_st;
}
Expand Down Expand Up @@ -631,11 +635,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
if (_sink_buffer) {
fmt::format_to(
debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
}
return fmt::to_string(debug_string_buffer);
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
dep_vec.push_back(_queue_dependency.get());
if (_queue_dependency) {
dep_vec.push_back(_queue_dependency.get());
}
if (_broadcast_dependency) {
dep_vec.push_back(_broadcast_dependency.get());
}
Expand Down

0 comments on commit 6edc6f8

Please sign in to comment.