From 43bb8b13917e0aa88fd0f9f4fc0dfb67ac66dfee Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 11 Oct 2024 14:05:33 +0800 Subject: [PATCH] [Improvement](shuffle) Reduce memory consumption in data stream sender (#41676) --- .../pipeline/exec/exchange_sink_operator.cpp | 33 +++++++++++++------ be/src/pipeline/exec/exchange_sink_operator.h | 1 + be/src/vec/sink/vdata_stream_sender.cpp | 4 +-- be/src/vec/sink/vdata_stream_sender.h | 2 +- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 7584c0b0e4591c..8f9dc52a048b98 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { _part_type = p._part_type; SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { + std::random_device rd; + std::mt19937 g(rd()); + shuffle(channels.begin(), channels.end(), g); + } int local_size = 0; for (int i = 0; i < channels.size(); ++i) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; + _last_local_channel_idx = i; } } - if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || - _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - std::random_device rd; - std::mt19937 g(rd()); - shuffle(channels.begin(), channels.end(), g); - } only_local_exchange = local_size == channels.size(); PUniqueId id; @@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (local_state.only_local_exchange) { if (!block->empty()) { Status status; + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + block, idx == local_state._last_local_channel_idx); HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } } } else { @@ -471,17 +478,23 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { block_holder->get_block()->Clear(); } + size_t idx = 0; for (auto* channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { - status = channel->send_local_block(&cur_block); + // If this channel is the last, we can move this block to downstream pipeline. + // Otherwise, this block also need to be broadcasted to other channels so should be copied. + DCHECK_GE(local_state._last_local_channel_idx, 0); + status = channel->send_local_block( + &cur_block, idx == local_state._last_local_channel_idx); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); } + idx++; } cur_block.clear_column_data(); local_state._serializer.get_block()->set_mutable_columns( @@ -496,7 +509,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -582,7 +595,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); + auto status = current_channel->send_local_block(block, false); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index a94392b906d259..aeb6a1503b7c06 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -234,6 +234,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for external table sink hash partition std::unique_ptr _partition_function = nullptr; std::atomic _reach_limit = false; + int _last_local_channel_idx = -1; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 44124ea7954f5a..4eb93969a60a9c 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -231,7 +231,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { } template -Status Channel::send_local_block(Block* block) { +Status Channel::send_local_block(Block* block, bool can_be_moved) { SCOPED_TIMER(_parent->local_send_timer()); if (_recvr_is_valid()) { if constexpr (!std::is_same_v) { @@ -239,7 +239,7 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), false); + _local_recvr->add_block(block, _parent->sender_id(), can_be_moved); return Status::OK(); } else { return _receiver_status; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index b9462434f073dc..92344b994e08f7 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -307,7 +307,7 @@ class Channel { Status send_local_block(Status exec_status, bool eos = false); - Status send_local_block(Block* block); + Status send_local_block(Block* block, bool can_be_moved); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels