Skip to content

Commit

Permalink
[refactor](shuffle) Use local exchanger for both exchange sink and lo…
Browse files Browse the repository at this point in the history
…cal exchange sink
  • Loading branch information
Gabriel39 committed Dec 13, 2024
1 parent ff090ff commit f268db5
Show file tree
Hide file tree
Showing 21 changed files with 587 additions and 503 deletions.
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ void BroadcastPBlockHolderMemLimiter::acquire(BroadcastPBlockHolder& holder) {
DCHECK(_broadcast_dependency != nullptr);
holder.set_parent_creator(shared_from_this());
auto size = holder._pblock->column_values().size();
if (size == 0) {
return;
}
_total_queue_buffer_size += size;
_total_queue_blocks_count++;
if (_total_queue_buffer_size >= config::exchg_node_buffer_size_bytes ||
Expand All @@ -77,6 +80,9 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde
std::unique_lock l(_holders_lock);
DCHECK(_broadcast_dependency != nullptr);
auto size = holder._pblock->column_values().size();
if (size == 0) {
return;
}
_total_queue_buffer_size -= size;
_total_queue_blocks_count--;
if (_total_queue_buffer_size <= 0) {
Expand Down
304 changes: 138 additions & 166 deletions be/src/pipeline/exec/exchange_sink_operator.cpp

Large diffs are not rendered by default.

96 changes: 79 additions & 17 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,69 @@ class TDataSink;

namespace pipeline {

using ChannelId = int;
using ChannelIds = std::vector<ChannelId>;
class ChannelSelector {
public:
virtual ~ChannelSelector() = default;
virtual ChannelIds& next_channel_ids() = 0;
virtual void process_next_block(size_t data_size) {}
};

class AllChannelsSelector final : public ChannelSelector {
public:
AllChannelsSelector(size_t num_channels) : ChannelSelector(), _all_channel_ids(num_channels) {
for (int i = 0; i < num_channels; i++) {
_all_channel_ids[i] = i;
}
}
~AllChannelsSelector() override = default;
ChannelIds& next_channel_ids() override { return _all_channel_ids; }

private:
ChannelIds _all_channel_ids;
};

class RoundRobinSelector final : public ChannelSelector {
public:
RoundRobinSelector(const size_t num_channels)
: ChannelSelector(), _next_channel_ids(1, -1), _num_channels(num_channels) {}
~RoundRobinSelector() override = default;
ChannelIds& next_channel_ids() override {
_next_channel_ids[0] = ++_next_channel_ids[0] % _num_channels;
return _next_channel_ids;
}

private:
ChannelIds _next_channel_ids;
const size_t _num_channels;
};

class TableSinkRandomSelector final : public ChannelSelector {
public:
TableSinkRandomSelector(const size_t num_channels)
: ChannelSelector(), _next_channel_ids(1, -1), _num_channels(num_channels) {}
~TableSinkRandomSelector() override = default;
ChannelIds& next_channel_ids() override {
_next_channel_ids[0] = ++_next_channel_ids[0] % _writer_count;
if (_writer_count < _num_channels) {
if (_data_processed >=
_writer_count *
config::table_sink_non_partition_write_scaling_data_processed_threshold) {
_writer_count++;
}
}
return _next_channel_ids;
}
void process_next_block(size_t data_size) override { _data_processed += data_size; }

private:
ChannelIds _next_channel_ids;
const size_t _num_channels;
int _writer_count = 1;
size_t _data_processed = 0;
};

class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<>;
Expand Down Expand Up @@ -102,11 +165,15 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
return _distribute_rows_into_channels_timer;
}
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if _random == true
bool only_local_exchange {false};
int* next_channel_id() { return &_channel_id; }
RuntimeProfile::Counter* copy_shuffled_data_timer() { return _copy_shuffled_data_timer; }

void on_channel_finished(InstanceLoId channel_id);
vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); }
ExchangerBase* exchanger() { return _exchanger.get(); }
RuntimeProfile::Counter* enqueue_blocks_counter() { return _enqueue_blocks_counter; }
RuntimeProfile::Counter* dequeue_blocks_counter() { return _dequeue_blocks_counter; }

private:
friend class ExchangeSinkOperatorX;
Expand All @@ -133,6 +200,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
RuntimeProfile::Counter* _wait_queue_timer = nullptr;
RuntimeProfile::Counter* _wait_broadcast_buffer_timer = nullptr;
std::vector<RuntimeProfile::Counter*> _wait_channel_timer;
RuntimeProfile::Counter* _copy_shuffled_data_timer = nullptr;
RuntimeProfile::Counter* _enqueue_blocks_counter = nullptr;
RuntimeProfile::Counter* _dequeue_blocks_counter = nullptr;

// Sender instance id, unique within a fragment.
int _sender_id;
Expand Down Expand Up @@ -164,7 +234,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::unique_ptr<Writer> _writer;
size_t _partition_count;

std::shared_ptr<Dependency> _finish_dependency;
Expand All @@ -175,11 +244,13 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
TPartitionType::type _part_type;

std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;

std::atomic_int _working_channels_count = 0;
std::set<InstanceLoId> _finished_channels;
std::mutex _finished_channels_mutex;
std::unique_ptr<ChannelSelector> _channel_selector;
int _channel_id = 0;
std::shared_ptr<ExchangerBase> _exchanger = nullptr;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down Expand Up @@ -207,13 +278,12 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId sender_ins_id);
vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; }
bool is_broadcast() const {
return _part_type == TPartitionType::UNPARTITIONED || _dests.size() == 1;
}

private:
friend class ExchangeSinkLocalState;

template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st);

// Use ExchangeSinkOperatorX to create a sink buffer.
// The sink buffer can be shared among multiple ExchangeSinkLocalState instances,
// or each ExchangeSinkLocalState can have its own sink buffer.
Expand All @@ -229,11 +299,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt

TPartitionType::type _part_type;

// serialized batches for broadcasting; we need two so we can write
// one while the other one is still being sent
PBlock _pb_block1;
PBlock _pb_block2;

const std::vector<TPlanFragmentDestination> _dests;

// Identifier of the destination plan node.
Expand All @@ -249,17 +314,14 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
const TOlapTablePartitionParam _tablet_sink_partition;
const TOlapTableLocationParam _tablet_sink_location;
const TTupleId _tablet_sink_tuple_id;
int64_t _tablet_sink_txn_id = -1;
const int64_t _tablet_sink_txn_id = -1;
std::shared_ptr<ObjectPool> _pool;
vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;

// for external table sink random partition
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
size_t _data_processed = 0;
int _writer_count = 1;
const bool _enable_local_merge_sort;
const std::vector<TUniqueId>& _fragment_instance_ids;
std::unique_ptr<Writer> _writer = nullptr;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt
for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) {
_probe_exprs.emplace_back(conjunct.left);
}
_partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
_partitioner = SpillPartitionerType::create_unique(_partition_count);
RETURN_IF_ERROR(_partitioner->init(_probe_exprs));

return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta
_build_exprs.emplace_back(eq_join_conjunct.right);
partition_exprs.emplace_back(eq_join_conjunct.right);
}
_partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
_partitioner = SpillPartitionerType::create_unique(_partition_count);
RETURN_IF_ERROR(_partitioner->init(_build_exprs));

return Status::OK();
Expand Down
14 changes: 9 additions & 5 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
_shuffle_idx_to_instance_idx[i] = {i, i};
}
}
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
_num_partitions));
_partitioner =
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>::create_unique(
_num_partitions);
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
DCHECK_GT(num_buckets, 0);
_partitioner.reset(
new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets));
_partitioner =
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>::create_unique(
num_buckets);
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
Expand All @@ -86,6 +88,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(_init_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
_enqueue_blocks_counter = ADD_COUNTER(profile(), "EnqueueRows", TUnit::UNIT);
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
_profile->add_info_string(
"UseGlobalShuffle",
Expand Down Expand Up @@ -146,7 +149,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
RETURN_IF_ERROR(local_state._exchanger->sink(
state, in_block, eos,
{local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr},
{local_state._compute_hash_value_timer, local_state._distribute_timer, nullptr,
local_state._enqueue_blocks_counter, nullptr},
{&local_state._channel_id, local_state._partitioner.get(), &local_state}));

// If all exchange sources ended due to limit reached, current task should also finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalEx
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
RuntimeProfile::Counter* _enqueue_blocks_counter = nullptr;

// Used by random passthrough exchanger
int _channel_id = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1);
_dequeue_blocks_counter = ADD_COUNTER(profile(), "DequeueRows", TUnit::UNIT);
if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE ||
_exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) {
_copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
Expand Down Expand Up @@ -116,9 +117,11 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._exchanger->get_block(
state, block, eos, {nullptr, nullptr, local_state._copy_data_timer},
{local_state._channel_id, &local_state}));
RETURN_IF_ERROR(
local_state._exchanger->get_block(state, block, eos,
{nullptr, nullptr, local_state._copy_data_timer,
nullptr, local_state._dequeue_blocks_counter},
{local_state._channel_id, &local_state}));
local_state.reached_limit(block, eos);
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExch
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
RuntimeProfile::Counter* _dequeue_blocks_counter = nullptr;
std::vector<RuntimeProfile::Counter*> _deps_counter;
std::vector<DependencySPtr> _local_merge_deps;
};
Expand Down
Loading

0 comments on commit f268db5

Please sign in to comment.