Skip to content

Commit

Permalink
[pipelineX](fix) Fix TPCH Q2 (apache#28783)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and stephen committed Dec 28, 2023
1 parent 1abc9aa commit 7af8592
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 30 deletions.
8 changes: 3 additions & 5 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
: OperatorX<ExchangeLocalState>(pool, tnode, operator_id, descs),
_num_senders(num_senders),
_is_merging(tnode.exchange_node.__isset.sort_info),
_is_hash_partition(
tnode.exchange_node.__isset.partition_type &&
(tnode.exchange_node.partition_type == TPartitionType::HASH_PARTITIONED ||
tnode.exchange_node.partition_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)),
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,21 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
}

DataDistribution get_local_exchange_type() const override {
if (!_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
return {ExchangeType::NOOP};
}
return {ExchangeType::HASH_SHUFFLE};
return _partition_type == TPartitionType::HASH_PARTITIONED
? DataDistribution(ExchangeType::HASH_SHUFFLE)
: _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
: DataDistribution(ExchangeType::NOOP);
}

private:
friend class ExchangeLocalState;
const int _num_senders;
const bool _is_merging;
const bool _is_hash_partition;
const TPartitionType::type _partition_type;
RowDescriptor _input_row_desc;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;

Expand Down
30 changes: 23 additions & 7 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,30 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
return _collect_query_statistics_with_every_batch;
}

bool need_to_local_shuffle(const DataDistribution target_data_distribution) const {
if (target_data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE ||
target_data_distribution.distribution_type == ExchangeType::HASH_SHUFFLE) {
// If `_data_distribution` of this pipeline does not match the `target_data_distribution`,
// we should do local shuffle.
return target_data_distribution.operator!=(_data_distribution);
static bool is_hash_exchange(ExchangeType idx) {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE;
}

bool need_to_local_exchange(const DataDistribution target_data_distribution) const {
if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE &&
target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) {
return true;
} else if (operatorXs.front()->ignore_data_hash_distribution()) {
if (_data_distribution.distribution_type ==
target_data_distribution.distribution_type &&
(_data_distribution.partition_exprs.empty() ||
target_data_distribution.partition_exprs.empty())) {
return true;
}
return _data_distribution.distribution_type !=
target_data_distribution.distribution_type ||
_data_distribution.partition_exprs != target_data_distribution.partition_exprs;
} else {
return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
!(is_hash_exchange(_data_distribution.distribution_type) &&
is_hash_exchange(target_data_distribution.distribution_type));
}
return true;
}
void init_data_distribution() {
set_data_distribution(operatorXs.front()->get_local_exchange_type());
Expand Down
19 changes: 5 additions & 14 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,22 +605,13 @@ struct DataDistribution {
DataDistribution(const DataDistribution& other)
: distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {}
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
bool operator==(const DataDistribution& other) const {
if (distribution_type == other.distribution_type &&
(distribution_type == ExchangeType::HASH_SHUFFLE ||
distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) &&
(partition_exprs.empty() || other.partition_exprs.empty())) {
return true;
}
return distribution_type == other.distribution_type &&
partition_exprs == other.partition_exprs;
}
DataDistribution operator=(const DataDistribution& other) const {
return DataDistribution(other.distribution_type, other.partition_exprs);
DataDistribution& operator=(const DataDistribution& other) {
distribution_type = other.distribution_type;
partition_exprs = other.partition_exprs;
return *this;
}
bool operator!=(const DataDistribution& other) const { return !operator==(other); }
ExchangeType distribution_type;
const std::vector<TExpr> partition_exprs;
std::vector<TExpr> partition_exprs;
};

class Exchanger;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
return Status::OK();
}

if (!cur_pipe->need_to_local_shuffle(data_distribution)) {
if (!cur_pipe->need_to_local_exchange(data_distribution)) {
return Status::OK();
}
*do_local_exchange = true;
Expand Down

0 comments on commit 7af8592

Please sign in to comment.