Skip to content

Commit

Permalink
[branch-3.0](pick) Pick 6 commits (#41715)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 11, 2024
1 parent a1f8383 commit 8260737
Show file tree
Hide file tree
Showing 92 changed files with 462 additions and 661 deletions.
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,12 +769,13 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status AggSinkOperatorX::prepare(RuntimeState* state) {
Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc()));
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -789,7 +790,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc(),
state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}
Expand Down Expand Up @@ -824,10 +825,6 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output(
_probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor));
}
return Status::OK();
}

Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (auto& _aggregate_evaluator : _aggregate_evaluators) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,18 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child_x
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,14 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
}
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_buffered_tuple_id);
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
if (!_partition_by_eq_expr_ctxs.empty()) {
Expand All @@ -249,10 +250,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc));
}
}
return Status::OK();
}

Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
13 changes: 4 additions & 9 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,15 @@ Status AnalyticLocalState::close(RuntimeState* state) {
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
for (size_t i = 0; i < _agg_functions.size(); ++i) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_agg_functions[i]->set_version(state->be_exec_version());
_change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
Expand All @@ -597,11 +597,6 @@ Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}
return Status::OK();
}

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
for (auto* agg_function : _agg_functions) {
RETURN_IF_ERROR(agg_function->open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
bool is_source() const override { return true; }

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
Status DataGenSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/datagen_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
const DescriptorTbl& descs);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

[[nodiscard]] bool is_source() const override { return true; }
Expand Down
14 changes: 4 additions & 10 deletions be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState*
return Status::OK();
}

Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

Expand All @@ -412,12 +412,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}

return Status::OK();
}

Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ class DistinctStreamingAggOperatorX final
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
bool need_more_input_data(RuntimeState* state) const override;

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/es_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status EsScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
Status EsScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));

_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/es_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
const DescriptorTbl& descs, int parallel_tasks);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
friend class EsScanLocalState;
Expand Down
14 changes: 5 additions & 9 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,15 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
_mem_tracker = std::make_unique<MemTracker>("ExchangeSinkOperatorX:");
return Status::OK();
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
DCHECK(state != nullptr);
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child_x->row_desc()));
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc()));
} else {
auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));
Expand Down Expand Up @@ -686,10 +682,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}

DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
if (_child_x && _enable_local_merge_sort) {
if (_child && _enable_local_merge_sort) {
// SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
// SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child);
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt

RuntimeState* state() { return _state; }

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand Down
10 changes: 2 additions & 8 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,13 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state
return Status::OK();
}

Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
Status ExchangeSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
DCHECK_GT(_num_senders, 0);

if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor));
}

return Status::OK();
}

Status ExchangeSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int num_senders);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState* state) {
return Status::OK();
}

Status FileScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
Status FileScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::open(state));
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) {
TFileScanRangeParams& params =
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

bool is_file_scan_operator() const override { return true; }

Expand Down
10 changes: 3 additions & 7 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,15 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
return Status::OK();
}

Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id;
return Status::InternalError("unknown destination tuple descriptor");
}
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}

Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class GroupCommitBlockSinkOperatorX final

Status init(const TDataSink& sink) override;

Status prepare(RuntimeState* state) override;

Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override;
Expand Down
Loading

0 comments on commit 8260737

Please sign in to comment.