Skip to content

Commit

Permalink
[followup](memory) Block should be moved by hash shuffling (apache#41740
Browse files Browse the repository at this point in the history
)

Follow-up for apache#41676
  • Loading branch information
Gabriel39 committed Oct 15, 2024
1 parent 43bb8b1 commit e90a846
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
block_holder->get_block()->Clear();
}
size_t idx = 0;
bool moved = false;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
Expand All @@ -488,6 +489,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
&cur_block, idx == local_state._last_local_channel_idx);
moved = idx == local_state._last_local_channel_idx;
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
Expand All @@ -496,9 +498,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
idx++;
}
cur_block.clear_column_data();
local_state._serializer.get_block()->set_mutable_columns(
cur_block.mutate_columns());
if (moved) {
local_state._serializer.reset_block();
} else {
cur_block.clear_column_data();
local_state._serializer.get_block()->set_mutable_columns(
cur_block.mutate_columns());
}
}
}
}
Expand All @@ -509,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block, false);
auto status = current_channel->send_local_block(block, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down Expand Up @@ -595,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block, false);
auto status = current_channel->send_local_block(block, true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down

0 comments on commit e90a846

Please sign in to comment.