Skip to content

Commit

Permalink
[Improvement](shuffle) Reduce memory consumption in data stream sender (
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Oct 15, 2024
1 parent 77fbe63 commit 43bb8b1
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
33 changes: 23 additions & 10 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_part_type = p._part_type;
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
int local_size = 0;
for (int i = 0; i < channels.size(); ++i) {
RETURN_IF_ERROR(channels[i]->open(state));
if (channels[i]->is_local()) {
local_size++;
_last_local_channel_idx = i;
}
}
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
}
only_local_exchange = local_size == channels.size();

PUniqueId id;
Expand Down Expand Up @@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (local_state.only_local_exchange) {
if (!block->empty()) {
Status status;
size_t idx = 0;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
status = channel->send_local_block(block);
// If this channel is the last, we can move this block to downstream pipeline.
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
block, idx == local_state._last_local_channel_idx);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
}
}
} else {
Expand All @@ -471,17 +478,23 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
size_t idx = 0;
for (auto* channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
status = channel->send_local_block(&cur_block);
// If this channel is the last, we can move this block to downstream pipeline.
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
&cur_block, idx == local_state._last_local_channel_idx);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
}
cur_block.clear_column_data();
local_state._serializer.get_block()->set_mutable_columns(
Expand All @@ -496,7 +509,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down Expand Up @@ -582,7 +595,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
auto status = current_channel->send_local_block(block);
auto status = current_channel->send_local_block(block, false);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,15 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
}

template <typename Parent>
Status Channel<Parent>::send_local_block(Block* block) {
Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
SCOPED_TIMER(_parent->local_send_timer());
if (_recvr_is_valid()) {
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(block, _parent->sender_id(), false);
_local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
return Status::OK();
} else {
return _receiver_status;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class Channel {

Status send_local_block(Status exec_status, bool eos = false);

Status send_local_block(Block* block);
Status send_local_block(Block* block, bool can_be_moved);
// Flush buffered rows and close channel. This function don't wait the response
// of close operation, client should call close_wait() to finish channel's close.
// We split one close operation into two phases in order to make multiple channels
Expand Down

0 comments on commit 43bb8b1

Please sign in to comment.