From e90a846426ffd9a956379c4630303e5039f38920 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 12 Oct 2024 16:45:59 +0800 Subject: [PATCH] [followup](memory) Block should be moved by hash shuffling (#41740) Follow-up for #41676 --- be/src/pipeline/exec/exchange_sink_operator.cpp | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 8f9dc52a048b98..ada5d5455b0dd3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -479,6 +479,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block block_holder->get_block()->Clear(); } size_t idx = 0; + bool moved = false; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; @@ -488,6 +489,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block DCHECK_GE(local_state._last_local_channel_idx, 0); status = channel->send_local_block( &cur_block, idx == local_state._last_local_channel_idx); + moved = idx == local_state._last_local_channel_idx; } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); @@ -496,9 +498,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } idx++; } - cur_block.clear_column_data(); - local_state._serializer.get_block()->set_mutable_columns( - cur_block.mutate_columns()); + if (moved) { + local_state._serializer.reset_block(); + } else { + cur_block.clear_column_data(); + local_state._serializer.get_block()->set_mutable_columns( + cur_block.mutate_columns()); + } } } } @@ -509,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, false); + auto status = current_channel->send_local_block(block, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -595,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, false); + auto status = current_channel->send_local_block(block, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());