From f268db56a7b12bd4532fd689d876705a0e1d96b0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 12 Dec 2024 19:57:20 +0800 Subject: [PATCH] [refactor](shuffle) Use local exchanger for both exchange sink and local exchange sink --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 6 + .../pipeline/exec/exchange_sink_operator.cpp | 304 ++++++++---------- be/src/pipeline/exec/exchange_sink_operator.h | 96 +++++- .../partitioned_hash_join_probe_operator.cpp | 2 +- .../partitioned_hash_join_sink_operator.cpp | 2 +- .../local_exchange_sink_operator.cpp | 14 +- .../local_exchange_sink_operator.h | 1 + .../local_exchange_source_operator.cpp | 9 +- .../local_exchange_source_operator.h | 1 + .../local_exchange/local_exchanger.cpp | 229 ++++++++----- .../pipeline/local_exchange/local_exchanger.h | 65 ++-- be/src/pipeline/shuffle/writer.cpp | 127 +++----- be/src/pipeline/shuffle/writer.h | 19 +- be/src/vec/core/block.cpp | 2 +- be/src/vec/runtime/partitioner.h | 9 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 20 +- be/src/vec/runtime/vdata_stream_recvr.h | 4 +- .../scale_writer_partitioning_exchanger.hpp | 10 +- .../vec/sink/tablet_sink_hash_partitioner.h | 5 + be/src/vec/sink/vdata_stream_sender.cpp | 94 +++--- be/src/vec/sink/vdata_stream_sender.h | 71 ++-- 21 files changed, 587 insertions(+), 503 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 65e7698737076e..824bf9868d1473 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -65,6 +65,9 @@ void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) { DCHECK(_broadcast_dependency != nullptr); holder.set_parent_creator(shared_from_this()); auto size = holder._pblock->column_values().size(); + if (size == 0) { + return; + } _total_queue_buffer_size += size; _total_queue_blocks_count++; if (_total_queue_buffer_size >= config::exchg_node_buffer_size_bytes || @@ -77,6 +80,9 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde std::unique_lock l(_holders_lock); DCHECK(_broadcast_dependency != nullptr); auto size = holder._pblock->column_values().size(); + if (size == 0) { + return; + } _total_queue_buffer_size -= size; _total_queue_blocks_count--; if (_total_queue_buffer_size <= 0) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index aa893fc0a26f2e..f770d70cc8e16b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -38,10 +38,36 @@ #include "util/uid_util.h" #include "vec/columns/column_const.h" #include "vec/exprs/vexpr.h" +#include "vec/sink/scale_writer_partitioning_exchanger.hpp" #include "vec/sink/tablet_sink_hash_partitioner.h" namespace doris::pipeline { #include "common/compile_check_begin.h" + +inline std::string get_partition_type_name(TPartitionType::type part_type) { + switch (part_type) { + case TPartitionType::UNPARTITIONED: + return "UNPARTITIONED"; + case TPartitionType::RANDOM: + return "RANDOM"; + case TPartitionType::HASH_PARTITIONED: + return "HASH_PARTITIONED"; + case TPartitionType::RANGE_PARTITIONED: + return "RANGE_PARTITIONED"; + case TPartitionType::LIST_PARTITIONED: + return "LIST_PARTITIONED"; + case TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED: + return "BUCKET_SHFFULE_HASH_PARTITIONED"; + case TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED: + return "TABLET_SINK_SHUFFLE_PARTITIONED"; + case TPartitionType::TABLE_SINK_HASH_PARTITIONED: + return "TABLE_SINK_HASH_PARTITIONED"; + case TPartitionType::TABLE_SINK_RANDOM_PARTITIONED: + return "TABLE_SINK_RANDOM_PARTITIONED"; + } + __builtin_unreachable(); +} + bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { return _parent->cast()._transfer_large_data_by_brpc; } @@ -54,10 +80,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf SCOPED_TIMER(_init_timer); _sender_id = info.sender_id; + _enqueue_blocks_counter = ADD_COUNTER(_profile, "EnqueueRows", TUnit::UNIT); + _dequeue_blocks_counter = ADD_COUNTER(_profile, "DequeueRows", TUnit::UNIT); _bytes_sent_counter = ADD_COUNTER(_profile, "BytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(_profile, "UncompressedRowBatchSize", TUnit::BYTES); _local_sent_rows = ADD_COUNTER(_profile, "LocalSentRows", TUnit::UNIT); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); + _copy_shuffled_data_timer = ADD_TIMER(_profile, "CopyShuffledDataTime"); _compress_timer = ADD_TIMER(_profile, "CompressTime"); _local_send_timer = ADD_TIMER(_profile, "LocalSendTime"); _split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime"); @@ -108,7 +137,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf for (int i = 0; i < channels.size(); ++i) { if (channels[i]->is_local()) { local_size++; - _last_local_channel_idx = i; } } only_local_exchange = local_size == channels.size(); @@ -121,37 +149,38 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _sink_buffer->set_dependency(state->fragment_instance_id().lo, _queue_dependency, this); } - if (_part_type == TPartitionType::HASH_PARTITIONED) { - _partition_count = channels.size(); - _partitioner = - std::make_unique>( - channels.size()); - RETURN_IF_ERROR(_partitioner->init(p._texprs)); - RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + int num_dests = cast_set(p._dests.size()); + const int free_block_limit = 1; + if (num_dests == 1 || _part_type == TPartitionType::UNPARTITIONED) { + _exchanger = BroadcastExchanger::create_shared(1, num_dests, free_block_limit); + } + switch (_part_type) { + case TPartitionType::HASH_PARTITIONED: + case TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED: _partition_count = channels.size(); _partitioner = - std::make_unique>( - channels.size()); - RETURN_IF_ERROR(_partitioner->init(p._texprs)); - RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + vectorized::Crc32HashPartitioner::create_unique( + _partition_count); + if (_partition_count != 1) { + _exchanger = ShuffleExchanger::create_shared(1, _partition_count, _partition_count, + free_block_limit); + } + _channel_selector = std::make_unique(_partition_count); + break; + case TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED: _partition_count = channels.size(); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); - _partitioner = std::make_unique( + _partitioner = vectorized::TabletSinkHashPartitioner::create_unique( _partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema, p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this); - RETURN_IF_ERROR(_partitioner->init({})); - RETURN_IF_ERROR(_partitioner->prepare(state, {})); - } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { + if (num_dests != 1) { + _exchanger = ShuffleExchanger::create_shared(1, num_dests, num_dests, free_block_limit); + } + _channel_selector = std::make_unique(channels.size()); + break; + case TPartitionType::TABLE_SINK_HASH_PARTITIONED: _partition_count = channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; - _partitioner = std::make_unique( + _partitioner = vectorized::ScaleWriterPartitioner::create_unique( channels.size(), _partition_count, channels.size(), 1, config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / state->task_num() == @@ -165,11 +194,36 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf ? config::table_sink_partition_write_min_data_processed_rebalance_threshold : config::table_sink_partition_write_min_data_processed_rebalance_threshold / state->task_num()); - + if (num_dests != 1) { + _exchanger = ShuffleExchanger::create_shared(1, num_dests, num_dests, free_block_limit); + } + _channel_selector = std::make_unique(channels.size()); + break; + case TPartitionType::UNPARTITIONED: + _channel_selector = std::make_unique(channels.size()); + break; + case TPartitionType::RANDOM: + _channel_selector = std::make_unique(channels.size()); + if (num_dests != 1) { + _exchanger = PassthroughExchanger::create_shared(1, num_dests, free_block_limit); + } + break; + case TPartitionType::TABLE_SINK_RANDOM_PARTITIONED: + _channel_selector = std::make_unique(channels.size()); + if (num_dests != 1) { + _exchanger = PassthroughExchanger::create_shared(1, num_dests, free_block_limit); + } + break; + default: + return Status::InternalError("Unsupported exchange type : " + + std::to_string((int)_part_type)); + } + _profile->add_info_string("PartitionType", get_partition_type_name(_part_type)); + _profile->add_info_string("Exchanger", _exchanger->debug_string()); + if (_partitioner) { + _profile->add_info_string("Partitioner", _partitioner->debug_string()); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); - _profile->add_info_string("Partitioner", - fmt::format("Crc32HashPartitioner({})", _partition_count)); } return Status::OK(); @@ -197,7 +251,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(Base::open(state)); - _writer.reset(new Writer()); auto& p = _parent->cast(); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || @@ -211,7 +264,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(channels[i]->open(state)); if (channels[i]->is_local()) { local_size++; - _last_local_channel_idx = i; } } only_local_exchange = local_size == channels.size(); @@ -221,14 +273,11 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { PUniqueId id; id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - - if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && - !only_local_exchange) { - _broadcast_dependency = Dependency::create_shared( - _parent->operator_id(), _parent->node_id(), "BroadcastDependency", true); - _broadcast_pb_mem_limiter = - vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); - } else if (local_size > 0) { + _broadcast_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "BroadcastDependency", true); + _broadcast_pb_mem_limiter = + vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); + if (local_size > 0) { size_t dep_id = 0; for (auto& channel : channels) { if (channel->is_local()) { @@ -302,13 +351,15 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { RETURN_IF_ERROR(DataSinkOperatorX::init(tsink)); - if (_part_type == TPartitionType::RANGE_PARTITIONED) { + if (_part_type == TPartitionType::RANGE_PARTITIONED || + _part_type == TPartitionType::LIST_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); } if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, _tablet_sink_expr_ctxs)); } + _writer.reset(new Writer()); return Status::OK(); } @@ -336,14 +387,6 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) { return Status::OK(); } -template -void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, - Status st) { - channel->set_receiver_eof(st); - // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. - static_cast(channel->close(state)); -} - Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { auto& local_state = get_local_state(state); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows()); @@ -359,138 +402,54 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block return Status::EndOfFile("all data stream channels EOF"); } - if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { - // 1. serialize depends on it is not local exchange - // 2. send block - // 3. rollover block - if (local_state.only_local_exchange) { - if (!block->empty()) { - Status status; - size_t idx = 0; - for (auto& channel : local_state.channels) { - if (!channel->is_receiver_eof()) { - // If this channel is the last, we can move this block to downstream pipeline. - // Otherwise, this block also need to be broadcasted to other channels so should be copied. - DCHECK_GE(local_state._last_local_channel_idx, 0); - status = channel->send_local_block( - block, eos, idx == local_state._last_local_channel_idx); - HANDLE_CHANNEL_STATUS(state, channel, status); - } - idx++; - } - } - } else { - auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); - { - bool serialized = false; - RETURN_IF_ERROR(local_state._serializer.next_serialized_block( - block, block_holder->get_block(), local_state._rpc_channels_num, - &serialized, eos)); - if (serialized) { - auto cur_block = local_state._serializer.get_block()->to_block(); - if (!cur_block.empty()) { - DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0); - RETURN_IF_ERROR(local_state._serializer.serialize_block( - &cur_block, block_holder->get_block(), - local_state._rpc_channels_num)); - } else { - block_holder->reset_block(); - } - - local_state._broadcast_pb_mem_limiter->acquire(*block_holder); - - size_t idx = 0; - bool moved = false; - for (auto& channel : local_state.channels) { - if (!channel->is_receiver_eof()) { - Status status; - if (channel->is_local()) { - // If this channel is the last, we can move this block to downstream pipeline. - // Otherwise, this block also need to be broadcasted to other channels so should be copied. - DCHECK_GE(local_state._last_local_channel_idx, 0); - status = channel->send_local_block( - &cur_block, eos, - idx == local_state._last_local_channel_idx); - moved = idx == local_state._last_local_channel_idx; - } else { - status = channel->send_broadcast_block(block_holder, eos); - } - HANDLE_CHANNEL_STATUS(state, channel, status); - } - idx++; - } - if (moved) { - local_state._serializer.reset_block(); - } else { - cur_block.clear_column_data(); - local_state._serializer.get_block()->set_mutable_columns( - cur_block.mutate_columns()); - } - } + auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); + auto* in_block = block; + bool should_output = !block->empty() || eos; + size_t data_size = + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED ? block->bytes() : 0; + if (is_broadcast()) { + // For broadcast shuffling, we do accumulate a full data block in `local_state._serializer`. + RETURN_IF_ERROR(local_state._serializer.next_serialized_block(block, &should_output, eos)); + if (should_output) { + local_state._serializer.swap_data(in_block); + if (!local_state.only_local_exchange) { + RETURN_IF_ERROR(local_state._serializer.serialize_block( + in_block, block_holder->get_block(), local_state._rpc_channels_num)); } } - } else if (_part_type == TPartitionType::RANDOM) { - // 1. select channel - auto& current_channel = local_state.channels[local_state.current_channel_idx]; - if (!current_channel->is_receiver_eof()) { - // 2. serialize, send and rollover block - if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, eos, true); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } else { - auto pblock = std::make_unique(); - RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); - auto status = current_channel->send_remote_block(std::move(pblock), eos); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } - } - local_state.current_channel_idx = - (local_state.current_channel_idx + 1) % local_state.channels.size(); - } else if (_part_type == TPartitionType::HASH_PARTITIONED || - _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || - _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || - _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { - RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, eos)); - } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - // Control the number of channels according to the flow, thereby controlling the number of table sink writers. - // 1. select channel - auto& current_channel = local_state.channels[local_state.current_channel_idx]; - if (!current_channel->is_receiver_eof()) { - // 2. serialize, send and rollover block - if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, eos, true); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } else { - auto pblock = std::make_unique(); - RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); - auto status = current_channel->send_remote_block(std::move(pblock), eos); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } - _data_processed += block->bytes(); - } + local_state._broadcast_pb_mem_limiter->acquire(*block_holder); + } + if (!should_output) { + return Status::OK(); + } + RETURN_IF_ERROR(_writer->write(&local_state, state, in_block, eos)); + DCHECK(in_block->empty()); - if (_writer_count < local_state.channels.size()) { - if (_data_processed >= - _writer_count * - config::table_sink_non_partition_write_scaling_data_processed_threshold) { - _writer_count++; - } + auto& channel_ids = local_state._channel_selector->next_channel_ids(); + for (auto channel_id : channel_ids) { + if (!local_state.channels[channel_id]->is_receiver_eof()) { + local_state._channel_selector->process_next_block(data_size); } - local_state.current_channel_idx = (local_state.current_channel_idx + 1) % _writer_count; - } else { - // Range partition - // 1. calculate range - // 2. dispatch rows to channel + RETURN_IF_ERROR(_writer->send_to_channels( + &local_state, state, local_state.channels[channel_id].get(), channel_id, eos, + is_broadcast() ? block_holder : nullptr)); } Status final_st = Status::OK(); + int channel_id = 0; if (eos) { + DCHECK(local_state._serializer.get_block() == nullptr || + local_state._serializer.get_block()->empty()) + << " node id: " << node_id() << " remain rows" + << local_state._serializer.get_block()->rows(); local_state._serializer.reset_block(); for (auto& channel : local_state.channels) { - Status st = channel->close(state); + Status st = _writer->finish(local_state.exchanger(), state, channel.get(), channel_id, + Status::OK()); if (!st.ok() && final_st.ok()) { final_st = st; } + channel_id++; } } return final_st; @@ -521,6 +480,19 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { return Status::OK(); } SCOPED_TIMER(exec_time_counter()); + if (!exec_status.ok()) { + Status final_st = Status::OK(); + int channel_id = 0; + for (auto& channel : channels) { + Status st = _parent->cast()._writer->finish( + exchanger(), state, channel.get(), channel_id, exec_status); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + channel_id++; + } + RETURN_IF_ERROR(final_st); + } if (_partitioner) { RETURN_IF_ERROR(_partitioner->close(state)); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index e88389b1d7bb5a..9e1f0722e8a3d6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -36,6 +36,69 @@ class TDataSink; namespace pipeline { +using ChannelId = int; +using ChannelIds = std::vector; +class ChannelSelector { +public: + virtual ~ChannelSelector() = default; + virtual ChannelIds& next_channel_ids() = 0; + virtual void process_next_block(size_t data_size) {} +}; + +class AllChannelsSelector final : public ChannelSelector { +public: + AllChannelsSelector(size_t num_channels) : ChannelSelector(), _all_channel_ids(num_channels) { + for (int i = 0; i < num_channels; i++) { + _all_channel_ids[i] = i; + } + } + ~AllChannelsSelector() override = default; + ChannelIds& next_channel_ids() override { return _all_channel_ids; } + +private: + ChannelIds _all_channel_ids; +}; + +class RoundRobinSelector final : public ChannelSelector { +public: + RoundRobinSelector(const size_t num_channels) + : ChannelSelector(), _next_channel_ids(1, -1), _num_channels(num_channels) {} + ~RoundRobinSelector() override = default; + ChannelIds& next_channel_ids() override { + _next_channel_ids[0] = ++_next_channel_ids[0] % _num_channels; + return _next_channel_ids; + } + +private: + ChannelIds _next_channel_ids; + const size_t _num_channels; +}; + +class TableSinkRandomSelector final : public ChannelSelector { +public: + TableSinkRandomSelector(const size_t num_channels) + : ChannelSelector(), _next_channel_ids(1, -1), _num_channels(num_channels) {} + ~TableSinkRandomSelector() override = default; + ChannelIds& next_channel_ids() override { + _next_channel_ids[0] = ++_next_channel_ids[0] % _writer_count; + if (_writer_count < _num_channels) { + if (_data_processed >= + _writer_count * + config::table_sink_non_partition_write_scaling_data_processed_threshold) { + _writer_count++; + } + } + return _next_channel_ids; + } + void process_next_block(size_t data_size) override { _data_processed += data_size; } + +private: + ChannelIds _next_channel_ids; + const size_t _num_channels; + int _writer_count = 1; + size_t _data_processed = 0; +}; + class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); using Base = PipelineXSinkLocalState<>; @@ -102,11 +165,15 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { return _distribute_rows_into_channels_timer; } std::vector> channels; - int current_channel_idx {0}; // index of current channel to send to if _random == true bool only_local_exchange {false}; + int* next_channel_id() { return &_channel_id; } + RuntimeProfile::Counter* copy_shuffled_data_timer() { return _copy_shuffled_data_timer; } void on_channel_finished(InstanceLoId channel_id); vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); } + ExchangerBase* exchanger() { return _exchanger.get(); } + RuntimeProfile::Counter* enqueue_blocks_counter() { return _enqueue_blocks_counter; } + RuntimeProfile::Counter* dequeue_blocks_counter() { return _dequeue_blocks_counter; } private: friend class ExchangeSinkOperatorX; @@ -133,6 +200,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { RuntimeProfile::Counter* _wait_queue_timer = nullptr; RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr; std::vector _wait_channel_timer; + RuntimeProfile::Counter* _copy_shuffled_data_timer = nullptr; + RuntimeProfile::Counter* _enqueue_blocks_counter = nullptr; + RuntimeProfile::Counter* _dequeue_blocks_counter = nullptr; // Sender instance id, unique within a fragment. int _sender_id; @@ -164,7 +234,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { */ std::vector> _local_channels_dependency; std::unique_ptr _partitioner; - std::unique_ptr _writer; size_t _partition_count; std::shared_ptr _finish_dependency; @@ -175,11 +244,13 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { TPartitionType::type _part_type; std::atomic _reach_limit = false; - int _last_local_channel_idx = -1; std::atomic_int _working_channels_count = 0; std::set _finished_channels; std::mutex _finished_channels_mutex; + std::unique_ptr _channel_selector; + int _channel_id = 0; + std::shared_ptr _exchanger = nullptr; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { @@ -207,13 +278,12 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX get_sink_buffer(InstanceLoId sender_ins_id); vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; } + bool is_broadcast() const { + return _part_type == TPartitionType::UNPARTITIONED || _dests.size() == 1; + } private: friend class ExchangeSinkLocalState; - - template - void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); - // Use ExchangeSinkOperatorX to create a sink buffer. // The sink buffer can be shared among multiple ExchangeSinkLocalState instances, // or each ExchangeSinkLocalState can have its own sink buffer. @@ -229,11 +299,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _dests; // Identifier of the destination plan node. @@ -249,17 +314,14 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX _pool; vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; const std::vector* _t_tablet_sink_exprs = nullptr; - // for external table sink random partition - // Control the number of channels according to the flow, thereby controlling the number of table sink writers. - size_t _data_processed = 0; - int _writer_count = 1; const bool _enable_local_merge_sort; const std::vector& _fragment_instance_ids; + std::unique_ptr _writer = nullptr; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 20b25d54ff9f16..6e8c4cb50607f4 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -518,7 +518,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) { _probe_exprs.emplace_back(conjunct.left); } - _partitioner = std::make_unique(_partition_count); + _partitioner = SpillPartitionerType::create_unique(_partition_count); RETURN_IF_ERROR(_partitioner->init(_probe_exprs)); return Status::OK(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 878c3870946f1c..a620bd4e85a30f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -420,7 +420,7 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta _build_exprs.emplace_back(eq_join_conjunct.right); partition_exprs.emplace_back(eq_join_conjunct.right); } - _partitioner = std::make_unique(_partition_count); + _partitioner = SpillPartitionerType::create_unique(_partition_count); RETURN_IF_ERROR(_partitioner->init(_build_exprs)); return Status::OK(); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index b22ee9fd77e72f..1c6969204f86fb 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -58,13 +58,15 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets _shuffle_idx_to_instance_idx[i] = {i, i}; } } - _partitioner.reset(new vectorized::Crc32HashPartitioner( - _num_partitions)); + _partitioner = + vectorized::Crc32HashPartitioner::create_unique( + _num_partitions); RETURN_IF_ERROR(_partitioner->init(_texprs)); } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { DCHECK_GT(num_buckets, 0); - _partitioner.reset( - new vectorized::Crc32HashPartitioner(num_buckets)); + _partitioner = + vectorized::Crc32HashPartitioner::create_unique( + num_buckets); RETURN_IF_ERROR(_partitioner->init(_texprs)); } return Status::OK(); @@ -86,6 +88,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + _enqueue_blocks_counter = ADD_COUNTER(profile(), "EnqueueRows", TUnit::UNIT); if (_parent->cast()._type == ExchangeType::HASH_SHUFFLE) { _profile->add_info_string( "UseGlobalShuffle", @@ -146,7 +149,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); RETURN_IF_ERROR(local_state._exchanger->sink( state, in_block, eos, - {local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr}, + {local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr, + local_state._enqueue_blocks_counter, nullptr}, {&local_state._channel_id, local_state._partitioner.get(), &local_state})); // If all exchange sources ended due to limit reached, current task should also finish diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index c067f023c8d420..35739c7ee95f31 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -65,6 +65,7 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState _partitioner = nullptr; + RuntimeProfile::Counter* _enqueue_blocks_counter = nullptr; // Used by random passthrough exchanger int _channel_id = 0; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 63e36cdfdb0c01..e892f8920689f3 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -31,6 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& DCHECK(_exchanger != nullptr); _get_block_failed_counter = ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1); + _dequeue_blocks_counter = ADD_COUNTER(profile(), "DequeueRows", TUnit::UNIT); if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime"); @@ -116,9 +117,11 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized:: bool* eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - RETURN_IF_ERROR(local_state._exchanger->get_block( - state, block, eos, {nullptr, nullptr, local_state._copy_data_timer}, - {local_state._channel_id, &local_state})); + RETURN_IF_ERROR( + local_state._exchanger->get_block(state, block, eos, + {nullptr, nullptr, local_state._copy_data_timer, + nullptr, local_state._dequeue_blocks_counter}, + {local_state._channel_id, &local_state})); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index 3c706d50182538..a7f5ca41b01643 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -57,6 +57,7 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState _deps_counter; std::vector _local_merge_deps; }; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index a963de8b684310..d346e3ef26bfc9 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -27,15 +27,17 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" + template -void Exchanger::_enqueue_data_and_set_ready(int channel_id, - LocalExchangeSinkLocalState* local_state, - BlockType&& block) { +void Exchanger::_enqueue_data_and_set_ready( + RuntimeProfile::Counter* enqueue_rows_counter, int channel_id, + LocalExchangeSinkLocalState* local_state, BlockType&& block) { if (local_state == nullptr) { - _enqueue_data_and_set_ready(channel_id, std::move(block)); + _enqueue_data_and_set_ready(enqueue_rows_counter, channel_id, std::move(block)); return; } size_t allocated_bytes = 0; + int64_t rows = 0; // PartitionedBlock is used by shuffle exchanger. // PartitionedBlock will be push into multiple queues with different row ranges, so it will be // referenced multiple times. Otherwise, we only ref the block once because it is only push into @@ -43,15 +45,18 @@ void Exchanger::_enqueue_data_and_set_ready(int channel_id, if constexpr (std::is_same_v || std::is_same_v) { allocated_bytes = block.first->data_block.allocated_bytes(); + rows = block.second.length; } else { block->ref(1); allocated_bytes = block->data_block.allocated_bytes(); + rows = block->data_block.rows(); } std::unique_lock l(_m); local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes, !std::is_same_v && !std::is_same_v); if (_data_queue[channel_id].enqueue(std::move(block))) { + COUNTER_UPDATE(enqueue_rows_counter, rows); local_state->_shared_state->set_ready_to_read(channel_id); } else { local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes); @@ -59,27 +64,34 @@ void Exchanger::_enqueue_data_and_set_ready(int channel_id, // just unref the block. if constexpr (std::is_same_v || std::is_same_v) { - block.first->unref(local_state->_shared_state, allocated_bytes, channel_id); + block.first->unref(this, local_state->_shared_state, allocated_bytes, channel_id); } else { - block->unref(local_state->_shared_state, allocated_bytes, channel_id); + block->unref(this, local_state->_shared_state, allocated_bytes, channel_id); DCHECK_EQ(block->ref_value(), 0); } } } template -bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_state, +bool Exchanger::_dequeue_data(RuntimeProfile::Counter* dequeue_rows_counter, + LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id) { if (local_state == nullptr) { - if (!_dequeue_data(block, eos, data_block, channel_id)) { - throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data: {}", - data_queue_debug_string(channel_id)); - } - return true; + return _dequeue_data(dequeue_rows_counter, block, eos, data_block, channel_id); } bool all_finished = _running_sink_operators == 0; if (_data_queue[channel_id].try_dequeue(block)) { + int64_t rows = 0; + if constexpr (std::is_same_v || + std::is_same_v) { + rows = block.second.length; + } else { + rows = block->data_block.rows(); + } + if (dequeue_rows_counter) { + COUNTER_UPDATE(dequeue_rows_counter, rows); + } if constexpr (std::is_same_v || std::is_same_v) { local_state->_shared_state->sub_mem_usage(channel_id, @@ -88,7 +100,8 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st local_state->_shared_state->sub_mem_usage(channel_id, block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); + block->unref(this, local_state->_shared_state, data_block->allocated_bytes(), + channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; @@ -97,6 +110,16 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st } else { std::unique_lock l(_m); if (_data_queue[channel_id].try_dequeue(block)) { + int64_t rows = 0; + if constexpr (std::is_same_v || + std::is_same_v) { + rows = block.second.length; + } else { + rows = block->data_block.rows(); + } + if (dequeue_rows_counter) { + COUNTER_UPDATE(dequeue_rows_counter, rows); + } if constexpr (std::is_same_v || std::is_same_v) { local_state->_shared_state->sub_mem_usage( @@ -105,7 +128,8 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st local_state->_shared_state->sub_mem_usage(channel_id, block->data_block.allocated_bytes()); data_block->swap(block->data_block); - block->unref(local_state->_shared_state, data_block->allocated_bytes(), channel_id); + block->unref(this, local_state->_shared_state, data_block->allocated_bytes(), + channel_id); DCHECK_EQ(block->ref_value(), 0); } return true; @@ -117,30 +141,49 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st } template -void Exchanger::_enqueue_data_and_set_ready(int channel_id, BlockType&& block) { - if constexpr (!std::is_same_v && - !std::is_same_v) { +void Exchanger::_enqueue_data_and_set_ready( + RuntimeProfile::Counter* enqueue_rows_counter, int channel_id, BlockType&& block) { + int64_t rows = 0; + if constexpr (std::is_same_v || + std::is_same_v) { + rows = block.second.length; + } else { block->ref(1); + rows = block->data_block.rows(); } - if (!_data_queue[channel_id].enqueue(std::move(block))) { + if (_data_queue[channel_id].enqueue(std::move(block))) { + COUNTER_UPDATE(enqueue_rows_counter, rows); + } else { if constexpr (std::is_same_v || std::is_same_v) { - block.first->unref(); + block.first->unref(this); } else { - block->unref(); + block->unref(this); DCHECK_EQ(block->ref_value(), 0); } + throw Exception(ErrorCode::INTERNAL_ERROR, + "Exception occurs in data queue [eos = {}] of local exchange.", + _data_queue[channel_id].eos); } } template -bool Exchanger::_dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, +bool Exchanger::_dequeue_data(RuntimeProfile::Counter* dequeue_rows_counter, + BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id) { if (_data_queue[channel_id].try_dequeue(block)) { + int64_t rows = 0; + if constexpr (std::is_same_v || + std::is_same_v) { + rows = block.second.length; + } else { + rows = block->data_block.rows(); + } + COUNTER_UPDATE(dequeue_rows_counter, rows); if constexpr (!std::is_same_v && !std::is_same_v) { data_block->swap(block->data_block); - block->unref(); + block->unref(this); DCHECK_EQ(block->ref_value(), 0); } return true; @@ -159,8 +202,16 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, } { SCOPED_TIMER(profile.distribute_timer); - RETURN_IF_ERROR(_split_rows(state, sink_info.partitioner->get_channel_ids().get(), - in_block, *sink_info.channel_id, sink_info.local_state)); + const auto& channel_filed = sink_info.partitioner->get_channel_ids(); + if (channel_filed.len == sizeof(uint32_t)) { + RETURN_IF_ERROR(_split_rows(std::move(profile), state, + sink_info.partitioner->get_channel_ids().get(), + in_block, *sink_info.channel_id, sink_info.local_state)); + } else { + RETURN_IF_ERROR(_split_rows(std::move(profile), state, + sink_info.partitioner->get_channel_ids().get(), + in_block, *sink_info.channel_id, sink_info.local_state)); + } } return Status::OK(); @@ -171,10 +222,10 @@ void ShuffleExchanger::close(SourceInfo&& source_info) { bool eos; vectorized::Block block; _data_queue[source_info.channel_id].set_eos(); - while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, + while (_dequeue_data(nullptr, source_info.local_state, partitioned_block, &eos, &block, source_info.channel_id)) { partitioned_block.first->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, + this, source_info.local_state ? source_info.local_state->_shared_state : nullptr, source_info.channel_id); } } @@ -192,17 +243,21 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, offset_start, offset_start + partitioned_block.second.length)); block_wrapper->unref( + this, source_info.local_state ? source_info.local_state->_shared_state : nullptr, source_info.channel_id); } while (mutable_block.rows() < state->batch_size() && !*eos && - _dequeue_data(source_info.local_state, partitioned_block, eos, block, - source_info.channel_id)); + _dequeue_data(profile.dequeue_rows_counter, source_info.local_state, + partitioned_block, eos, block, source_info.channel_id)); return Status::OK(); }; - if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, - source_info.channel_id)) { + if (_dequeue_data(profile.dequeue_rows_counter, source_info.local_state, partitioned_block, eos, + block, source_info.channel_id)) { SCOPED_TIMER(profile.copy_data_timer); + if (block->columns() == 0) { + *block = partitioned_block.first->data_block.clone_empty(); + } mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data()); @@ -210,11 +265,13 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block return Status::OK(); } -Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, +template +Status ShuffleExchanger::_split_rows(Profile&& profile, RuntimeState* state, + const ChannelIdType* __restrict channel_ids, vectorized::Block* block, int channel_id, LocalExchangeSinkLocalState* local_state) { if (local_state == nullptr) { - return _split_rows(state, channel_ids, block, channel_id); + return _split_rows(std::move(profile), state, channel_ids, block, channel_id); } const auto rows = cast_set(block->rows()); auto row_idx = std::make_shared>(rows); @@ -265,10 +322,10 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest uint32_t start = partition_rows_histogram[it.first]; uint32_t size = partition_rows_histogram[it.first + 1] - start; if (size > 0) { - _enqueue_data_and_set_ready(it.second, local_state, + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, it.second, local_state, {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(local_state->_shared_state, channel_id); + new_block_wrapper->unref(this, local_state->_shared_state, channel_id); } } } else { @@ -278,10 +335,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest uint32_t start = partition_rows_histogram[i]; uint32_t size = partition_rows_histogram[i + 1] - start; if (size > 0) { - _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], local_state, + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, + bucket_seq_to_instance_idx[i], local_state, {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(local_state->_shared_state, channel_id); + new_block_wrapper->unref(this, local_state->_shared_state, channel_id); } } } @@ -289,22 +347,24 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest return Status::OK(); } -Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, +template +Status ShuffleExchanger::_split_rows(Profile&& profile, RuntimeState* state, + const ChannelIdType* __restrict channel_ids, vectorized::Block* block, int channel_id) { const auto rows = cast_set(block->rows()); auto row_idx = std::make_shared>(rows); auto& partition_rows_histogram = _partition_rows_histogram[channel_id]; { - partition_rows_histogram.assign(_num_partitions + 1, 0); + partition_rows_histogram.assign(_num_partitions + 2, 0); for (int32_t i = 0; i < rows; ++i) { - partition_rows_histogram[channel_ids[i]]++; + partition_rows_histogram[channel_ids[i] + 1]++; } - for (int32_t i = 1; i <= _num_partitions; ++i) { + for (int32_t i = 1; i <= _num_partitions + 1; ++i) { partition_rows_histogram[i] += partition_rows_histogram[i - 1]; } for (int32_t i = rows - 1; i >= 0; --i) { - (*row_idx)[partition_rows_histogram[channel_ids[i]] - 1] = i; - partition_rows_histogram[channel_ids[i]]--; + (*row_idx)[partition_rows_histogram[channel_ids[i] + 1] - 1] = i; + partition_rows_histogram[channel_ids[i] + 1]--; } } @@ -322,12 +382,13 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } new_block_wrapper->ref(cast_set(_num_partitions)); for (int i = 0; i < _num_partitions; i++) { - uint32_t start = partition_rows_histogram[i]; - uint32_t size = partition_rows_histogram[i + 1] - start; + uint32_t start = partition_rows_histogram[i + 1]; + uint32_t size = partition_rows_histogram[i + 2] - start; if (size > 0) { - _enqueue_data_and_set_ready(i, {new_block_wrapper, {row_idx, start, size}}); + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, i, + {new_block_wrapper, {row_idx, start, size}}); } else { - new_block_wrapper->unref(); + new_block_wrapper->unref(this); } } @@ -347,7 +408,8 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo new_block.swap(*in_block); wrapper = BlockWrapper::create_shared(std::move(new_block)); auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, sink_info.local_state, std::move(wrapper)); + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, channel_id, sink_info.local_state, + std::move(wrapper)); return Status::OK(); } @@ -357,7 +419,7 @@ void PassthroughExchanger::close(SourceInfo&& source_info) { BlockWrapperSPtr wrapper; bool eos; _data_queue[source_info.channel_id].set_eos(); - while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + while (_dequeue_data(nullptr, source_info.local_state, wrapper, &eos, &next_block, source_info.channel_id)) { // do nothing } @@ -368,7 +430,7 @@ void PassToOneExchanger::close(SourceInfo&& source_info) { BlockWrapperSPtr wrapper; bool eos; _data_queue[source_info.channel_id].set_eos(); - while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + while (_dequeue_data(nullptr, source_info.local_state, wrapper, &eos, &next_block, source_info.channel_id)) { // do nothing } @@ -377,7 +439,8 @@ void PassToOneExchanger::close(SourceInfo&& source_info) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, Profile&& profile, SourceInfo&& source_info) { BlockWrapperSPtr next_block; - _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); + _dequeue_data(profile.dequeue_rows_counter, source_info.local_state, next_block, eos, block, + source_info.channel_id); return Status::OK(); } @@ -393,7 +456,8 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block new_block.swap(*in_block); BlockWrapperSPtr wrapper = BlockWrapper::create_shared(std::move(new_block)); - _enqueue_data_and_set_ready(0, sink_info.local_state, std::move(wrapper)); + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, 0, sink_info.local_state, + std::move(wrapper)); return Status::OK(); } @@ -405,7 +469,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } BlockWrapperSPtr next_block; - _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); + _dequeue_data(profile.dequeue_rows_counter, source_info.local_state, next_block, eos, block, + source_info.channel_id); return Status::OK(); } @@ -419,7 +484,8 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ DCHECK_LE(*sink_info.channel_id, _data_queue.size()); new_block.swap(*in_block); - _enqueue_data_and_set_ready(*sink_info.channel_id, sink_info.local_state, + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, *sink_info.channel_id, + sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); } if (eos && sink_info.local_state) { @@ -444,7 +510,7 @@ void LocalMergeSortExchanger::finalize() { int id = 0; for (auto& data_queue : _data_queue) { data_queue.set_eos(); - while (_dequeue_data(next_block, &eos, &block, id)) { + while (_dequeue_data(nullptr, next_block, &eos, &block, id)) { block = vectorized::Block(); } id++; @@ -452,17 +518,18 @@ void LocalMergeSortExchanger::finalize() { ExchangerBase::finalize(); } -Status LocalMergeSortExchanger::build_merger(RuntimeState* state, +Status LocalMergeSortExchanger::build_merger(Profile&& profile, RuntimeState* state, LocalExchangeSourceLocalState* local_state) { RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, local_state->profile())); std::vector child_block_suppliers; for (int channel_id = 0; channel_id < _num_partitions; channel_id++) { - vectorized::BlockSupplier block_supplier = [&, local_state, id = channel_id]( - vectorized::Block* block, bool* eos) { - BlockWrapperSPtr next_block; - _dequeue_data(local_state, next_block, eos, block, id); - return Status::OK(); - }; + vectorized::BlockSupplier block_supplier = + [&, local_state, dequeue_rows_counter = profile.dequeue_rows_counter, + id = channel_id](vectorized::Block* block, bool* eos) { + BlockWrapperSPtr next_block; + _dequeue_data(dequeue_rows_counter, local_state, next_block, eos, block, id); + return Status::OK(); + }; child_block_suppliers.push_back(block_supplier); } RETURN_IF_ERROR(_merger->prepare(child_block_suppliers)); @@ -489,7 +556,7 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block } if (!_merger) { DCHECK(source_info.local_state); - RETURN_IF_ERROR(build_merger(state, source_info.local_state)); + RETURN_IF_ERROR(build_merger(std::move(profile), state, source_info.local_state)); } RETURN_IF_ERROR(_merger->get_next(block, eos)); return Status::OK(); @@ -513,7 +580,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block wrapper->ref(_num_partitions); for (int i = 0; i < _num_partitions; i++) { - _enqueue_data_and_set_ready(i, sink_info.local_state, + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, i, sink_info.local_state, {wrapper, {0, wrapper->data_block.rows()}}); } @@ -525,10 +592,10 @@ void BroadcastExchanger::close(SourceInfo&& source_info) { bool eos; vectorized::Block block; _data_queue[source_info.channel_id].set_eos(); - while (_dequeue_data(source_info.local_state, partitioned_block, &eos, &block, + while (_dequeue_data(nullptr, source_info.local_state, partitioned_block, &eos, &block, source_info.channel_id)) { partitioned_block.first->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, + this, source_info.local_state ? source_info.local_state->_shared_state : nullptr, source_info.channel_id); } } @@ -537,9 +604,12 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo Profile&& profile, SourceInfo&& source_info) { BroadcastBlock partitioned_block; - if (_dequeue_data(source_info.local_state, partitioned_block, eos, block, - source_info.channel_id)) { + if (_dequeue_data(profile.dequeue_rows_counter, source_info.local_state, partitioned_block, eos, + block, source_info.channel_id)) { SCOPED_TIMER(profile.copy_data_timer); + if (block->columns() == 0) { + *block = partitioned_block.first->data_block.clone_empty(); + } vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); @@ -548,14 +618,14 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo partitioned_block.second.offset_start, partitioned_block.second.length)); block_wrapper->unref( - source_info.local_state ? source_info.local_state->_shared_state : nullptr, + this, source_info.local_state ? source_info.local_state->_shared_state : nullptr, source_info.channel_id); } return Status::OK(); } -Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, +Status AdaptivePassthroughExchanger::_passthrough_sink(Profile&& profile, RuntimeState* state, vectorized::Block* in_block, SinkInfo&& sink_info) { vectorized::Block new_block; @@ -564,14 +634,14 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, } new_block.swap(*in_block); auto channel_id = ((*sink_info.channel_id)++) % _num_partitions; - _enqueue_data_and_set_ready(channel_id, sink_info.local_state, + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, channel_id, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); return Status::OK(); } -Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block, - SinkInfo&& sink_info) { +Status AdaptivePassthroughExchanger::_shuffle_sink(Profile&& profile, RuntimeState* state, + vectorized::Block* block, SinkInfo&& sink_info) { std::vector channel_ids; const auto num_rows = block->rows(); channel_ids.resize(num_rows, 0); @@ -586,10 +656,10 @@ Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectoriz std::iota(channel_ids.begin() + i, channel_ids.end(), 0); } } - return _split_rows(state, channel_ids.data(), block, std::move(sink_info)); + return _split_rows(std::move(profile), state, channel_ids.data(), block, std::move(sink_info)); } -Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, +Status AdaptivePassthroughExchanger::_split_rows(Profile&& profile, RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, SinkInfo&& sink_info) { const auto rows = cast_set(block->rows()); @@ -618,7 +688,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); - _enqueue_data_and_set_ready(i, sink_info.local_state, + _enqueue_data_and_set_ready(profile.enqueue_rows_counter, i, sink_info.local_state, BlockWrapper::create_shared(std::move(new_block))); } } @@ -631,12 +701,12 @@ Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block return Status::OK(); } if (_is_pass_through) { - return _passthrough_sink(state, in_block, std::move(sink_info)); + return _passthrough_sink(std::move(profile), state, in_block, std::move(sink_info)); } else { if (_total_block++ > _num_partitions) { _is_pass_through = true; } - return _shuffle_sink(state, in_block, std::move(sink_info)); + return _shuffle_sink(std::move(profile), state, in_block, std::move(sink_info)); } } @@ -644,7 +714,8 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: bool* eos, Profile&& profile, SourceInfo&& source_info) { BlockWrapperSPtr next_block; - _dequeue_data(source_info.local_state, next_block, eos, block, source_info.channel_id); + _dequeue_data(profile.dequeue_rows_counter, source_info.local_state, next_block, eos, block, + source_info.channel_id); return Status::OK(); } @@ -653,7 +724,7 @@ void AdaptivePassthroughExchanger::close(SourceInfo&& source_info) { bool eos; BlockWrapperSPtr wrapper; _data_queue[source_info.channel_id].set_eos(); - while (_dequeue_data(source_info.local_state, wrapper, &eos, &next_block, + while (_dequeue_data(nullptr, source_info.local_state, wrapper, &eos, &next_block, source_info.channel_id)) { // do nothing } diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index d6871b2ba97cc3..f3d417ca76e034 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -35,6 +35,8 @@ struct Profile { RuntimeProfile::Counter* compute_hash_value_timer = nullptr; RuntimeProfile::Counter* distribute_timer = nullptr; RuntimeProfile::Counter* copy_data_timer = nullptr; + RuntimeProfile::Counter* enqueue_rows_counter = nullptr; + RuntimeProfile::Counter* dequeue_rows_counter = nullptr; }; struct SinkInfo { @@ -90,6 +92,7 @@ class ExchangerBase { virtual void finalize(); virtual std::string data_queue_debug_string(int i) = 0; + std::string debug_string() const { return get_exchange_type_name(get_type()); } protected: friend struct LocalExchangeSharedState; @@ -174,13 +177,17 @@ class Exchanger : public ExchangerBase { protected: // Enqueue data block and set downstream source operator to read. - void _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState* local_state, + void _enqueue_data_and_set_ready(RuntimeProfile::Counter* enqueue_rows_counter, int channel_id, + LocalExchangeSinkLocalState* local_state, BlockType&& block); + bool _dequeue_data(RuntimeProfile::Counter* dequeue_rows_counter, + LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, + vectorized::Block* data_block, int channel_id); + + void _enqueue_data_and_set_ready(RuntimeProfile::Counter* enqueue_rows_counter, int channel_id, BlockType&& block); - bool _dequeue_data(LocalExchangeSourceLocalState* local_state, BlockType& block, bool* eos, + bool _dequeue_data(RuntimeProfile::Counter* dequeue_rows_counter, BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); - void _enqueue_data_and_set_ready(int channel_id, BlockType&& block); - bool _dequeue_data(BlockType& block, bool* eos, vectorized::Block* data_block, int channel_id); std::vector> _data_queue; private: @@ -205,24 +212,28 @@ struct BlockWrapper { BlockWrapper(vectorized::Block&& data_block_) : data_block(std::move(data_block_)) {} ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); } void ref(int delta) { ref_count += delta; } - void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes, int channel_id) { - if (ref_count.fetch_sub(1) == 1 && shared_state != nullptr) { + void unref(ExchangerBase* exchanger, LocalExchangeSharedState* shared_state, + size_t allocated_bytes, int channel_id) { + if (ref_count.fetch_sub(1) == 1) { DCHECK_GT(allocated_bytes, 0); - shared_state->sub_total_mem_usage(allocated_bytes, channel_id); - if (shared_state->exchanger->_free_block_limit == 0 || - shared_state->exchanger->_free_blocks.size_approx() < - shared_state->exchanger->_free_block_limit * - shared_state->exchanger->_num_sources) { + if (shared_state) { + shared_state->sub_total_mem_usage(allocated_bytes, channel_id); + } + if (data_block.mem_reuse() && + (exchanger->_free_block_limit == 0 || + exchanger->_free_blocks.size_approx() < + exchanger->_free_block_limit * exchanger->_num_senders)) { data_block.clear_column_data(); // Free blocks is used to improve memory efficiency. Failure during pushing back // free block will not incur any bad result so just ignore the return value. - shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); + exchanger->_free_blocks.enqueue(std::move(data_block)); } } } - void unref(LocalExchangeSharedState* shared_state = nullptr, int channel_id = 0) { - unref(shared_state, data_block.allocated_bytes(), channel_id); + void unref(ExchangerBase* exchanger, LocalExchangeSharedState* shared_state = nullptr, + int channel_id = 0) { + unref(exchanger, shared_state, data_block.allocated_bytes(), channel_id); } int ref_value() const { return ref_count.load(); } std::atomic ref_count = 0; @@ -251,11 +262,14 @@ class ShuffleExchanger : public Exchanger { ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: - Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, int channel_id, - LocalExchangeSinkLocalState* local_state); - Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, int channel_id); + template + Status _split_rows(Profile&& profile, RuntimeState* state, + const ChannelIdType* __restrict channel_ids, vectorized::Block* block, + int channel_id, LocalExchangeSinkLocalState* local_state); + template + Status _split_rows(Profile&& profile, RuntimeState* state, + const ChannelIdType* __restrict channel_ids, vectorized::Block* block, + int channel_id); std::vector> _partition_rows_histogram; }; @@ -324,7 +338,8 @@ class LocalMergeSortExchanger final : public Exchanger { SourceInfo&& source_info) override; ExchangeType get_type() const override { return ExchangeType::LOCAL_MERGE_SORT; } - Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState* local_state); + Status build_merger(Profile&& profile, RuntimeState* statem, + LocalExchangeSourceLocalState* local_state); void close(SourceInfo&& source_info) override {} void finalize() override; @@ -374,11 +389,13 @@ class AdaptivePassthroughExchanger : public Exchanger { void close(SourceInfo&& source_info) override; private: - Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, + Status _passthrough_sink(Profile&& profile, RuntimeState* state, vectorized::Block* in_block, SinkInfo&& sink_info); - Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, SinkInfo&& sink_info); - Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, SinkInfo&& sink_info); + Status _shuffle_sink(Profile&& profile, RuntimeState* state, vectorized::Block* in_block, + SinkInfo&& sink_info); + Status _split_rows(Profile&& profile, RuntimeState* state, + const uint32_t* __restrict channel_ids, vectorized::Block* block, + SinkInfo&& sink_info); std::atomic_bool _is_pass_through = false; std::atomic_int32_t _total_block = 0; diff --git a/be/src/pipeline/shuffle/writer.cpp b/be/src/pipeline/shuffle/writer.cpp index c27fd9a7aeb731..e0d8a87aa93833 100644 --- a/be/src/pipeline/shuffle/writer.cpp +++ b/be/src/pipeline/shuffle/writer.cpp @@ -17,98 +17,73 @@ #include "writer.h" +#include "pipeline/exec/exchange_sink_buffer.h" #include "pipeline/exec/exchange_sink_operator.h" +#include "pipeline/local_exchange/local_exchanger.h" #include "vec/core/block.h" +#include "vec/sink/vdata_stream_sender.h" namespace doris::pipeline { #include "common/compile_check_begin.h" -template -void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const { - channel->set_receiver_eof(st); - // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. - static_cast(channel->close(state)); -} - Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state, vectorized::Block* block, bool eos) const { - auto rows = block->rows(); - { - SCOPED_TIMER(local_state->split_block_hash_compute_timer()); - RETURN_IF_ERROR(local_state->partitioner()->do_partitioning(state, block)); - } - int64_t old_channel_mem_usage = 0; - for (const auto& channel : local_state->channels) { - old_channel_mem_usage += channel->mem_usage(); - } - { - SCOPED_TIMER(local_state->distribute_rows_into_channels_timer()); - const auto& channel_filed = local_state->partitioner()->get_channel_ids(); - if (channel_filed.len == sizeof(uint32_t)) { - RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, - local_state->channels.size(), - channel_filed.get(), rows, block, eos)); - } else { - RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, - local_state->channels.size(), - channel_filed.get(), rows, block, eos)); - } - } - int64_t new_channel_mem_usage = 0; - for (const auto& channel : local_state->channels) { - new_channel_mem_usage += channel->mem_usage(); - } - COUNTER_UPDATE(local_state->memory_used_counter(), - new_channel_mem_usage - old_channel_mem_usage); + RETURN_IF_ERROR(local_state->exchanger()->sink( + state, block, eos, + {local_state->split_block_hash_compute_timer(), + local_state->distribute_rows_into_channels_timer(), nullptr, + local_state->enqueue_blocks_counter(), nullptr}, + {local_state->next_channel_id(), local_state->partitioner(), nullptr})); + + DCHECK(block->empty()); return Status::OK(); } -template -Status Writer::_channel_add_rows(RuntimeState* state, - std::vector>& channels, - size_t partition_count, - const ChannelIdType* __restrict channel_ids, size_t rows, - vectorized::Block* block, bool eos) const { - std::vector partition_rows_histogram; - auto row_idx = vectorized::PODArray(rows); - { - partition_rows_histogram.assign(partition_count + 2, 0); - for (size_t i = 0; i < rows; ++i) { - partition_rows_histogram[channel_ids[i] + 1]++; - } - for (size_t i = 1; i <= partition_count + 1; ++i) { - partition_rows_histogram[i] += partition_rows_histogram[i - 1]; +Status Writer::send_to_channels( + ExchangeSinkLocalState* local_state, RuntimeState* state, vectorized::Channel* channel, + int channel_id, bool eos, + std::shared_ptr broadcasted_block) const { + if (!channel->is_receiver_eof()) { + Status status; + int64_t old_channel_mem_usage = channel->mem_usage(); + if (channel->serializer()->get_block() == nullptr) { + channel->serializer()->init_block(); } - for (int32_t i = cast_set(rows) - 1; i >= 0; --i) { - row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i; - partition_rows_histogram[channel_ids[i] + 1]--; + vectorized::Block output_block = channel->serializer()->get_block()->to_block(); + RETURN_IF_ERROR(local_state->exchanger()->get_block( + state, &output_block, &eos, + {nullptr, nullptr, local_state->copy_shuffled_data_timer(), nullptr, + local_state->dequeue_blocks_counter()}, + {channel_id, nullptr})); + bool reset = false; + if (broadcasted_block && !channel->is_local()) { + status = channel->send_broadcast_block(broadcasted_block, eos); + } else { + status = channel->send_block(&output_block, false, &reset); } - } -#define HANDLE_CHANNEL_STATUS(state, channel, status) \ - do { \ - if (status.is()) { \ - _handle_eof_channel(state, channel, status); \ - } else { \ - RETURN_IF_ERROR(status); \ - } \ - } while (0) - Status status = Status::OK(); - for (size_t i = 0; i < partition_count; ++i) { - uint32_t start = partition_rows_histogram[i + 1]; - uint32_t size = partition_rows_histogram[i + 2] - start; - if (!channels[i]->is_receiver_eof() && size > 0) { - status = channels[i]->add_rows(block, row_idx.data(), start, size, false); - HANDLE_CHANNEL_STATUS(state, channels[i], status); + + if (!reset) { + output_block.clear_column_data(); + channel->serializer()->swap_data(&output_block); } - } - if (eos) { - for (int i = 0; i < partition_count; ++i) { - if (!channels[i]->is_receiver_eof()) { - status = channels[i]->add_rows(block, row_idx.data(), 0, 0, true); - HANDLE_CHANNEL_STATUS(state, channels[i], status); - } + + if (status.is()) { + channel->set_receiver_eof(status); + RETURN_IF_ERROR(finish(local_state->exchanger(), state, channel, channel_id, status)); + } else { + RETURN_IF_ERROR(status); } + int64_t new_channel_mem_usage = channel->mem_usage(); + COUNTER_UPDATE(local_state->memory_used_counter(), + new_channel_mem_usage - old_channel_mem_usage); } return Status::OK(); } +Status Writer::finish(ExchangerBase* exchanger, RuntimeState* state, vectorized::Channel* channel, + int channel_id, Status status) const { + exchanger->close({channel_id, nullptr}); + RETURN_IF_ERROR(channel->close(state, status)); + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/shuffle/writer.h b/be/src/pipeline/shuffle/writer.h index 0eb772120293e5..8cbc4b05c967d7 100644 --- a/be/src/pipeline/shuffle/writer.h +++ b/be/src/pipeline/shuffle/writer.h @@ -17,6 +17,7 @@ #pragma once +#include "pipeline/local_exchange/local_exchanger.h" #include "vec/sink/vdata_stream_sender.h" namespace doris { @@ -37,17 +38,13 @@ class Writer { Status write(ExchangeSinkLocalState* local_state, RuntimeState* state, vectorized::Block* block, bool eos) const; - -private: - template - Status _channel_add_rows(RuntimeState* state, - std::vector>& channels, - size_t partition_count, const ChannelIdType* __restrict channel_ids, - size_t rows, vectorized::Block* block, bool eos) const; - - template - void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const; + Status send_to_channels( + ExchangeSinkLocalState* local_state, RuntimeState* state, vectorized::Channel* channel, + int channel_id, bool eos, + std::shared_ptr broadcasted_block = nullptr) const; + Status finish(ExchangerBase* exchanger, RuntimeState* state, vectorized::Channel* channel, + int channel_id, Status status) const; }; #include "common/compile_check_end.h" } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 4dc553b1a5790f..f7a60ffbf00e0e 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -1059,7 +1059,7 @@ Status MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, DCHECK_EQ(_data_types[i]->get_name(), src_col.type->get_name()); auto& dst = _columns[i]; const auto& src = *src_col.column.get(); - DCHECK_GE(src.size(), row_end - row_begin); + DCHECK_GE(src.size(), row_end - row_begin) << i << "-th column has unexpected length"; dst->insert_indices_from(src, row_begin, row_end); } }); diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 53d8b84d09c752..9f14455467c833 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -55,6 +55,8 @@ class PartitionerBase { size_t partition_count() const { return _partition_count; } + virtual std::string debug_string() const = 0; + protected: const size_t _partition_count; }; @@ -62,7 +64,8 @@ class PartitionerBase { template class Crc32HashPartitioner : public PartitionerBase { public: - Crc32HashPartitioner(int partition_count) : PartitionerBase(partition_count) {} + ENABLE_FACTORY_CREATOR(Crc32HashPartitioner); + Crc32HashPartitioner(size_t partition_count) : PartitionerBase(partition_count) {} ~Crc32HashPartitioner() override = default; Status init(const std::vector& texprs) override { @@ -83,6 +86,10 @@ class Crc32HashPartitioner : public PartitionerBase { Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + std::string debug_string() const override { + return fmt::format("Crc32HashPartitioner({})", _partition_count); + } + protected: Status _get_partition_column_result(Block* block, std::vector& result) const { int counter = 0; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index c3277b0917e84c..50865788d03a45 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -203,7 +203,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr pblock, return Status::OK(); } -void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { +void VDataStreamRecvr::SenderQueue::add_block(Block* block) { if (block->rows() == 0) { return; } @@ -213,18 +213,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { return; } } - BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name()); - - // local exchange should copy the block contented if use move == false - if (use_move) { - block->clear(); - } else { - auto rows = block->rows(); - for (int i = 0; i < nblock->columns(); ++i) { - nblock->get_by_position(i).column = - nblock->get_by_position(i).column->clone_resized(rows); - } - } + BlockUPtr nblock = Block::create_unique(); + nblock->swap(*block); materialize_block_inplace(*nblock); auto block_mem_size = nblock->allocated_bytes(); @@ -395,9 +385,9 @@ Status VDataStreamRecvr::add_block(std::unique_ptr pblock, int sender_id wait_for_worker, time_to_find_recvr); } -void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { +void VDataStreamRecvr::add_block(Block* block, int sender_id) { int use_sender_id = _is_merging ? sender_id : 0; - _sender_queues[use_sender_id]->add_block(block, use_move); + _sender_queues[use_sender_id]->add_block(block); } std::shared_ptr VDataStreamRecvr::get_local_channel_dependency( diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 1639366c8b83d6..d10e656541c14d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -89,7 +89,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx { int64_t packet_seq, ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); - void add_block(Block* block, int sender_id, bool use_move); + void add_block(Block* block, int sender_id); Status get_next(Block* block, bool* eos); @@ -184,7 +184,7 @@ class VDataStreamRecvr::SenderQueue { ::google::protobuf::Closure** done, const int64_t wait_for_worker, const uint64_t time_to_find_recvr); - void add_block(Block* block, bool use_move); + void add_block(Block* block); void decrement_senders(int sender_id); diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp index 92e52af4c67a33..7273f02936084c 100644 --- a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp +++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp @@ -30,6 +30,7 @@ namespace doris::vectorized { class ScaleWriterPartitioner final : public PartitionerBase { public: + ENABLE_FACTORY_CREATOR(ScaleWriterPartitioner); using HashValType = uint32_t; ScaleWriterPartitioner(int channel_size, int partition_count, int task_count, int task_bucket_count, @@ -49,7 +50,7 @@ class ScaleWriterPartitioner final : public PartitionerBase { min_partition_data_processed_rebalance_threshold), _min_data_processed_rebalance_threshold(min_data_processed_rebalance_threshold) { _crc_partitioner = - std::make_unique>( + vectorized::Crc32HashPartitioner::create_unique( _partition_count); } @@ -112,13 +113,18 @@ class ScaleWriterPartitioner final : public PartitionerBase { return Status::OK(); } + std::string debug_string() const override { + return fmt::format("ScaleWriterPartitioner(partition = {}, channel size = {})", + _partition_count, _channel_size); + } + private: int _get_next_writer_id(int partition_id) const { return _partition_rebalancer.get_task_id(partition_id, _partition_writer_indexes[partition_id]++); } - int _channel_size; + const int _channel_size; std::unique_ptr _crc_partitioner; mutable SkewedPartitionRebalancer _partition_rebalancer; mutable std::vector _partition_row_counts; diff --git a/be/src/vec/sink/tablet_sink_hash_partitioner.h b/be/src/vec/sink/tablet_sink_hash_partitioner.h index dbbc7c889267fb..466484349d7caa 100644 --- a/be/src/vec/sink/tablet_sink_hash_partitioner.h +++ b/be/src/vec/sink/tablet_sink_hash_partitioner.h @@ -31,6 +31,7 @@ namespace doris::vectorized { class TabletSinkHashPartitioner final : public PartitionerBase { public: using HashValType = int64_t; + ENABLE_FACTORY_CREATOR(TabletSinkHashPartitioner); TabletSinkHashPartitioner(size_t partition_count, int64_t txn_id, const TOlapTableSchemaParam& tablet_sink_schema, const TOlapTablePartitionParam& tablet_sink_partition, @@ -53,6 +54,10 @@ class TabletSinkHashPartitioner final : public PartitionerBase { Status close(RuntimeState* state) override; + std::string debug_string() const override { + return fmt::format("TabletSinkHashPartitioner({})", _partition_count); + } + private: static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index abed61334583b8..6868f99bd264a6 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -129,18 +129,13 @@ int64_t Channel::mem_usage() const { return mem_usage; } -Status Channel::send_remote_block(std::unique_ptr&& block, bool eos) { +Status Channel::send_remote_block(Block* block, bool eos) { COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); - if (eos) { - if (_eos_send) { - return Status::OK(); - } else { - _eos_send = true; - } - } - if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos, Status::OK()})); + auto pblock = std::make_unique(); + RETURN_IF_ERROR(_serializer.serialize_block(block, pblock.get())); + if (eos || pblock->column_metas_size()) { + RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock), eos, Status::OK()})); } return Status::OK(); } @@ -159,29 +154,7 @@ Status Channel::send_broadcast_block(std::shared_ptr& blo return Status::OK(); } -Status Channel::_send_current_block(bool eos) { - if (is_local()) { - return _send_local_block(eos); - } - return send_remote_block(std::move(_pblock), eos); -} - -Status Channel::_send_local_block(bool eos) { - Block block; - if (_serializer.get_block() != nullptr) { - block = _serializer.get_block()->to_block(); - _serializer.get_block()->set_mutable_columns(block.clone_empty_columns()); - } - - if (!block.empty() || eos) { - RETURN_IF_ERROR(send_local_block(&block, eos, true)); - } - return Status::OK(); -} - -Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { - SCOPED_TIMER(_parent->local_send_timer()); - +Status Channel::send_block(Block* block, bool eos, bool* reset) { if (eos) { if (_eos_send) { return Status::OK(); @@ -189,10 +162,27 @@ Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { _eos_send = true; } } - if (is_receiver_eof()) { return _receiver_status; } + if (block->rows() < _state->batch_size() && !eos) { + if (reset) { + *reset = true; + } + _serializer.reset_block(block); + return Status::OK(); + } + + if (is_local()) { + RETURN_IF_ERROR(send_local_block(block, eos)); + } else { + RETURN_IF_ERROR(send_remote_block(block, eos)); + } + return Status::OK(); +} + +Status Channel::send_local_block(Block* block, bool eos) { + SCOPED_TIMER(_parent->local_send_timer()); auto receiver_status = _recvr_status(); if (receiver_status.ok()) { @@ -202,7 +192,7 @@ Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { const auto sender_id = _parent->sender_id(); if (!block->empty()) [[likely]] { - _local_recvr->add_block(block, sender_id, can_be_moved); + _local_recvr->add_block(block, sender_id); } if (eos) [[unlikely]] { @@ -217,7 +207,7 @@ Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { } } -Status Channel::close(RuntimeState* state) { +Status Channel::close(RuntimeState* state, Status status) { if (_closed) { return Status::OK(); } @@ -230,37 +220,39 @@ Status Channel::close(RuntimeState* state) { if (is_receiver_eof()) { _serializer.reset_block(); return Status::OK(); - } else { - return _send_current_block(true); + } else if (status.ok()) { + Block block; + if (_serializer.get_block() != nullptr) { + block = _serializer.get_block()->to_block(); + } + return send_block(&block, true); } + return Status::OK(); } BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local) : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} -Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t num_receivers, - bool* serialized, bool eos, const uint32_t* data, - const uint32_t offset, const uint32_t size) { - if (_mutable_block == nullptr) { +Status BlockSerializer::next_serialized_block(Block* block, bool* serialized, bool eos) { + if (_mutable_block == nullptr || _mutable_block->columns() == 0) { _mutable_block = MutableBlock::create_unique(block->clone_empty()); } { SCOPED_TIMER(_parent->merge_block_timer()); - if (data) { - if (size > 0) { - RETURN_IF_ERROR( - _mutable_block->add_rows(block, data + offset, data + offset + size)); + if (!block->empty()) { + if (_mutable_block->empty()) { + auto tmp_block = _mutable_block->to_block(); + _mutable_block = MutableBlock::create_unique(block); + *block = tmp_block; + } else { + RETURN_IF_ERROR(_mutable_block->merge(*block)); + block->clear_column_data(); } - } else if (!block->empty()) { - RETURN_IF_ERROR(_mutable_block->merge(*block)); } } if (_mutable_block->rows() >= _batch_size || eos) { - if (!_is_local) { - RETURN_IF_ERROR(serialize_block(dest, num_receivers)); - } *serialized = true; return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 0ff1f252d5441f..2517693f006667 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -79,15 +79,25 @@ class BlockSerializer { #ifdef BE_TEST BlockSerializer() : _batch_size(0) {}; #endif - Status next_serialized_block(Block* src, PBlock* dest, size_t num_receivers, bool* serialized, - bool eos, const uint32_t* data = nullptr, - const uint32_t offset = 0, const uint32_t size = 0); + Status next_serialized_block(Block* src, bool* serialized, bool eos); Status serialize_block(PBlock* dest, size_t num_receivers = 1); Status serialize_block(const Block* src, PBlock* dest, size_t num_receivers = 1); + void reset_block(Block* block) { + DCHECK_EQ(_mutable_block->rows(), 0); + _mutable_block = MutableBlock::create_unique(block); + block->clear(); + } + void swap_data(Block* block) { + DCHECK(block->empty()); + auto tmp_block = vectorized::MutableBlock::build_mutable_block(block); + _mutable_block->swap(tmp_block); + *block = tmp_block.to_block(); + } MutableBlock* get_block() const { return _mutable_block.get(); } void reset_block() { _mutable_block.reset(); } + void init_block() { _mutable_block = MutableBlock::create_unique(); } void set_is_local(bool is_local) { _is_local = is_local; } bool is_local() const { return _is_local; } @@ -124,17 +134,19 @@ class Channel { Status init(RuntimeState* state); Status open(RuntimeState* state); - Status send_local_block(Block* block, bool eos, bool can_be_moved); + Status send_block(Block* block, bool eos, bool* reset = nullptr); + Status send_local_block(Block* block, bool eos); + // Asynchronously sends a block + // Returns the status of the most recently finished transmit_data + // rpc (or OK if there wasn't one that hasn't been reported yet). + // if batch is nullptr, send the eof packet + Status send_remote_block(Block* block, bool eos = false); + Status send_broadcast_block(std::shared_ptr& block, bool eos = false); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels // can run parallel. - Status close(RuntimeState* state); - - std::string get_fragment_instance_id_str() { - UniqueId uid(_fragment_instance_id); - return uid.to_string(); - } + Status close(RuntimeState* state, Status status); bool is_local() const { return _is_local; } @@ -143,32 +155,7 @@ class Channel { void set_receiver_eof(Status st) { _receiver_status = st; } int64_t mem_usage() const; - - // Asynchronously sends a block - // Returns the status of the most recently finished transmit_data - // rpc (or OK if there wasn't one that hasn't been reported yet). - // if batch is nullptr, send the eof packet - Status send_remote_block(std::unique_ptr&& block, bool eos = false); - Status send_broadcast_block(std::shared_ptr& block, bool eos = false); - - Status add_rows(Block* block, const uint32_t* data, const uint32_t offset, const uint32_t size, - bool eos) { - if (_fragment_instance_id.lo == -1) { - return Status::OK(); - } - - bool serialized = false; - if (_pblock == nullptr) { - _pblock = std::make_unique(); - } - RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, - data, offset, size)); - if (serialized) { - RETURN_IF_ERROR(_send_current_block(eos)); - } - - return Status::OK(); - } + BlockSerializer* serializer() { return &_serializer; } void set_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } @@ -188,9 +175,6 @@ class Channel { std::shared_ptr get_local_channel_dependency(); protected: - Status _send_local_block(bool eos); - Status _send_current_block(bool eos); - Status _recvr_status() const { if (_local_recvr && !_local_recvr->is_closed()) { return Status::OK(); @@ -226,15 +210,6 @@ class Channel { std::unique_ptr _pblock; }; -#define HANDLE_CHANNEL_STATUS(state, channel, status) \ - do { \ - if (status.is()) { \ - _handle_eof_channel(state, channel, status); \ - } else { \ - RETURN_IF_ERROR(status); \ - } \ - } while (0) - } // namespace vectorized } // namespace doris