From 656146a35845ffad277388df9735524395cfa526 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 10 Oct 2024 20:54:55 +0800 Subject: [PATCH] [Improvement](shuffle) Reduce memory consumption in data stream sender --- be/src/vec/runtime/vdata_stream_recvr.cpp | 18 ++++-------------- be/src/vec/runtime/vdata_stream_recvr.h | 4 ++-- be/src/vec/sink/vdata_stream_sender.cpp | 4 ++-- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 1ca6bb7f2c59316..54d89058bfd8120 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -211,7 +211,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num return Status::OK(); } -void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { +void VDataStreamRecvr::SenderQueue::add_block(Block* block) { if (block->rows() == 0) { return; } @@ -222,17 +222,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { } } BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name()); - - // local exchange should copy the block contented if use move == false - if (use_move) { - block->clear(); - } else { - auto rows = block->rows(); - for (int i = 0; i < nblock->columns(); ++i) { - nblock->get_by_position(i).column = - nblock->get_by_position(i).column->clone_resized(rows); - } - } + block->clear(); materialize_block_inplace(*nblock); auto block_mem_size = nblock->allocated_bytes(); @@ -405,9 +395,9 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_n wait_for_worker, time_to_find_recvr); } -void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { +void VDataStreamRecvr::add_block(Block* block, int sender_id) { int use_sender_id = _is_merging ? sender_id : 0; - _sender_queues[use_sender_id]->add_block(block, use_move); + _sender_queues[use_sender_id]->add_block(block); } std::shared_ptr VDataStreamRecvr::get_local_channel_dependency( diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e8dcfdedba5fb94..ff44e9e1eeed882 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -86,7 +86,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); - void add_block(Block* block, int sender_id, bool use_move); + void add_block(Block* block, int sender_id); Status get_next(Block* block, bool* eos); @@ -183,7 +183,7 @@ class VDataStreamRecvr::SenderQueue { ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); - void add_block(Block* block, bool use_move); + void add_block(Block* block); void decrement_senders(int sender_id); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index fb2f24ee0e1817c..73125d2ac3e9a6d 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -198,7 +198,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(&block, _parent->sender_id(), true); + _local_recvr->add_block(&block, _parent->sender_id()); if (eos) { _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } @@ -218,7 +218,7 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); } - _local_recvr->add_block(block, _parent->sender_id(), false); + _local_recvr->add_block(block, _parent->sender_id()); return Status::OK(); } else { return _receiver_status;