diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 8c96b4d744c83d..260a599a947a0d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -775,7 +775,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) { _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); RETURN_IF_ERROR(vectorized::VExpr::prepare( - _probe_expr_ctxs, state, DataSinkOperatorX::_child_x->row_desc())); + _probe_expr_ctxs, state, DataSinkOperatorX::_child->row_desc())); int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { @@ -790,7 +790,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, DataSinkOperatorX::_child_x->row_desc(), + state, DataSinkOperatorX::_child->row_desc(), intermediate_slot_desc, output_slot_desc)); _aggregate_evaluators[i]->set_version(state->be_exec_version()); } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index d55b382931d74b..97440de3f09e4c 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,7 +143,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { - return _needs_finalize || DataSinkOperatorX::_child_x + return _needs_finalize || DataSinkOperatorX::_child ->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index cc219ecbe642f0..85d7773bdbd025 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -234,11 +234,11 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) Status AnalyticSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); for (const auto& ctx : _agg_expr_ctxs) { - RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc())); } if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) { vector tuple_ids; - tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id()); + tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id()); tuple_ids.push_back(_buffered_tuple_id); RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector(2, false)); if (!_partition_by_eq_expr_ctxs.empty()) { diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 1996b9af58d2c4..b521a9b583fa94 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -562,13 +562,13 @@ Status AnalyticLocalState::close(RuntimeState* state) { Status AnalyticSourceOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(OperatorX::open(state)); - DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor)); + DCHECK(_child->row_desc().is_prefix_of(_row_descriptor)); _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); for (size_t i = 0; i < _agg_functions.size(); ++i) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i]; - RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(), + RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); _agg_functions[i]->set_version(state->be_exec_version()); _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() && diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 96212f7fd2ff00..5127605097f4c5 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -374,7 +374,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) { _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { @@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc)); + state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); _aggregate_evaluators[i]->set_version(state->be_exec_version()); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 71649aa21ec3d4..366b3c682f7dd5 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -648,10 +648,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { } DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { - if (_child_x && _enable_local_merge_sort) { + if (_child && _enable_local_merge_sort) { // SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR // SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR - if (auto sort_source = std::dynamic_pointer_cast(_child_x); + if (auto sort_source = std::dynamic_pointer_cast(_child); sort_source && sort_source->use_local_merge()) { // Sort the data local return ExchangeType::LOCAL_MERGE_SORT; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index d4ca54da637673..0bee88ed537ea6 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -488,7 +488,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); } } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_build_expr_ctxs, state); } @@ -505,7 +505,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (local_state._build_side_mutable_block.empty()) { auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename( - _child_x->row_desc()); + _child->row_desc()); tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false)); local_state._build_col_ids.resize(_build_expr_ctxs.size()); RETURN_IF_ERROR(local_state._do_evaluate(tmp_build_block, local_state._build_expr_ctxs, diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index c373af5d6622ff..b7ae612510fcb4 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -132,7 +132,7 @@ class HashJoinBuildSinkOperatorX final if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { - return _child_x->ignore_data_distribution() + return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::PASS_TO_ONE) : DataDistribution(ExchangeType::NOOP); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 7008397db770ae..f91e1eaa2a1b17 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -276,7 +276,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc /// increase the output rows count(just same as `_probe_block`'s rows count). RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos, &local_state._probe_block, false)); - local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + local_state._probe_block.clear_column_data(_child->row_desc().num_materialized_slots()); return Status::OK(); } @@ -597,7 +597,7 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) { } } }; - init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags); + init_output_slots_flags(_child->row_desc().tuple_descriptors(), _left_output_slot_flags); init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(), _right_output_slot_flags); // _other_join_conjuncts are evaluated in the context of the rows produced by this node @@ -609,12 +609,12 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); DCHECK(_build_side_child != nullptr); // right table data types _right_table_data_types = vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc()); - _left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc()); + _left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child->row_desc()); _right_table_column_names = vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc()); diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 65b7a2694e4b47..3f68c73d04b161 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -85,12 +85,12 @@ class JoinProbeOperatorX : public StatefulOperatorX { } Status set_child(OperatorPtr child) override { - if (OperatorX::_child_x && _build_side_child == nullptr) { + if (OperatorX::_child && _build_side_child == nullptr) { // when there already (probe) child, others is build child. set_build_side_child(child); } else { // first child which is probe side is in this pipeline - OperatorX::_child_x = std::move(child); + OperatorX::_child = std::move(child); } return Status::OK(); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 515c151c3c1fa8..793a37c7396a61 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -109,14 +109,14 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(JoinBuildSinkOperatorX::open(state)); - int num_build_tuples = _child_x->row_desc().tuple_descriptors().size(); + int num_build_tuples = _child->row_desc().tuple_descriptors().size(); for (int i = 0; i < num_build_tuples; ++i) { - TupleDescriptor* build_tuple_desc = _child_x->row_desc().tuple_descriptors()[i]; + TupleDescriptor* build_tuple_desc = _child->row_desc().tuple_descriptors()[i]; auto tuple_idx = _row_descriptor.get_tuple_idx(build_tuple_desc->id()); RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_filter_src_expr_ctxs, state); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index da7712e3e17685..f2ca259754b661 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -76,8 +76,8 @@ class NestedLoopJoinBuildSinkOperatorX final if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } - return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST) - : DataDistribution(ExchangeType::NOOP); + return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST) + : DataDistribution(ExchangeType::NOOP); } private: diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 5a0b6680eee765..9546ed8df56671 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -450,7 +450,7 @@ Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) { for (auto& conjunct : _join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); } - _num_probe_side_columns = _child_x->row_desc().num_materialized_slots(); + _num_probe_side_columns = _child->row_desc().num_materialized_slots(); _num_build_side_columns = _build_side_child->row_desc().num_materialized_slots(); return vectorized::VExpr::open(_join_conjuncts, state); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 217c3219d5c36d..d65769254b9dfc 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -116,12 +116,12 @@ std::string PipelineXSinkLocalState::name_suffix() { } DataDistribution DataSinkOperatorXBase::required_data_distribution() const { - return _child_x && _child_x->ignore_data_distribution() + return _child && _child->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } const RowDescriptor& OperatorBase::row_desc() const { - return _child_x->row_desc(); + return _child->row_desc(); } template @@ -220,15 +220,15 @@ Status OperatorXBase::open(RuntimeState* state) { for (auto& projections : _intermediate_projections) { RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); } - if (_child_x && !is_source()) { - RETURN_IF_ERROR(_child_x->open(state)); + if (_child && !is_source()) { + RETURN_IF_ERROR(_child->open(state)); } return Status::OK(); } Status OperatorXBase::close(RuntimeState* state) { - if (_child_x && !is_source()) { - RETURN_IF_ERROR(_child_x->close(state)); + if (_child && !is_source()) { + RETURN_IF_ERROR(_child->close(state)); } auto result = state->get_local_state_result(operator_id()); if (!result) { @@ -572,8 +572,7 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Status e template Status StreamingOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { - RETURN_IF_ERROR( - OperatorX::_child_x->get_block_after_projects(state, block, eos)); + RETURN_IF_ERROR(OperatorX::_child->get_block_after_projects(state, block, eos)); return pull(state, block, eos); } @@ -583,8 +582,8 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori auto& local_state = get_local_state(state); if (need_more_input_data(state)) { local_state._child_block->clear_column_data( - OperatorX::_child_x->row_desc().num_materialized_slots()); - RETURN_IF_ERROR(OperatorX::_child_x->get_block_after_projects( + OperatorX::_child->row_desc().num_materialized_slots()); + RETURN_IF_ERROR(OperatorX::_child->get_block_after_projects( state, local_state._child_block.get(), &local_state._child_eos)); *eos = local_state._child_eos; if (local_state._child_block->rows() == 0 && !local_state._child_eos) { @@ -668,66 +667,66 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s return Base::close(state, exec_status); } -#define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX; -DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) -DECLARE_OPERATOR_X(ResultSinkLocalState) -DECLARE_OPERATOR_X(JdbcTableSinkLocalState) -DECLARE_OPERATOR_X(MemoryScratchSinkLocalState) -DECLARE_OPERATOR_X(ResultFileSinkLocalState) -DECLARE_OPERATOR_X(OlapTableSinkLocalState) -DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) -DECLARE_OPERATOR_X(HiveTableSinkLocalState) -DECLARE_OPERATOR_X(IcebergTableSinkLocalState) -DECLARE_OPERATOR_X(AnalyticSinkLocalState) -DECLARE_OPERATOR_X(SortSinkLocalState) -DECLARE_OPERATOR_X(SpillSortSinkLocalState) -DECLARE_OPERATOR_X(LocalExchangeSinkLocalState) -DECLARE_OPERATOR_X(AggSinkLocalState) -DECLARE_OPERATOR_X(PartitionedAggSinkLocalState) -DECLARE_OPERATOR_X(ExchangeSinkLocalState) -DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) -DECLARE_OPERATOR_X(UnionSinkLocalState) -DECLARE_OPERATOR_X(MultiCastDataStreamSinkLocalState) -DECLARE_OPERATOR_X(PartitionSortSinkLocalState) -DECLARE_OPERATOR_X(SetProbeSinkLocalState) -DECLARE_OPERATOR_X(SetProbeSinkLocalState) -DECLARE_OPERATOR_X(SetSinkLocalState) -DECLARE_OPERATOR_X(SetSinkLocalState) -DECLARE_OPERATOR_X(PartitionedHashJoinSinkLocalState) -DECLARE_OPERATOR_X(GroupCommitBlockSinkLocalState) - -#undef DECLARE_OPERATOR_X - -#define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX; -DECLARE_OPERATOR_X(HashJoinProbeLocalState) -DECLARE_OPERATOR_X(OlapScanLocalState) -DECLARE_OPERATOR_X(GroupCommitLocalState) -DECLARE_OPERATOR_X(JDBCScanLocalState) -DECLARE_OPERATOR_X(FileScanLocalState) -DECLARE_OPERATOR_X(EsScanLocalState) -DECLARE_OPERATOR_X(AnalyticLocalState) -DECLARE_OPERATOR_X(SortLocalState) -DECLARE_OPERATOR_X(SpillSortLocalState) -DECLARE_OPERATOR_X(AggLocalState) -DECLARE_OPERATOR_X(PartitionedAggLocalState) -DECLARE_OPERATOR_X(TableFunctionLocalState) -DECLARE_OPERATOR_X(ExchangeLocalState) -DECLARE_OPERATOR_X(RepeatLocalState) -DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) -DECLARE_OPERATOR_X(AssertNumRowsLocalState) -DECLARE_OPERATOR_X(EmptySetLocalState) -DECLARE_OPERATOR_X(UnionSourceLocalState) -DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState) -DECLARE_OPERATOR_X(PartitionSortSourceLocalState) -DECLARE_OPERATOR_X(SetSourceLocalState) -DECLARE_OPERATOR_X(SetSourceLocalState) -DECLARE_OPERATOR_X(DataGenLocalState) -DECLARE_OPERATOR_X(SchemaScanLocalState) -DECLARE_OPERATOR_X(MetaScanLocalState) -DECLARE_OPERATOR_X(LocalExchangeSourceLocalState) -DECLARE_OPERATOR_X(PartitionedHashJoinProbeLocalState) - -#undef DECLARE_OPERATOR_X +#define DECLARE_OPERATOR(LOCAL_STATE) template class DataSinkOperatorX; +DECLARE_OPERATOR(HashJoinBuildSinkLocalState) +DECLARE_OPERATOR(ResultSinkLocalState) +DECLARE_OPERATOR(JdbcTableSinkLocalState) +DECLARE_OPERATOR(MemoryScratchSinkLocalState) +DECLARE_OPERATOR(ResultFileSinkLocalState) +DECLARE_OPERATOR(OlapTableSinkLocalState) +DECLARE_OPERATOR(OlapTableSinkV2LocalState) +DECLARE_OPERATOR(HiveTableSinkLocalState) +DECLARE_OPERATOR(IcebergTableSinkLocalState) +DECLARE_OPERATOR(AnalyticSinkLocalState) +DECLARE_OPERATOR(SortSinkLocalState) +DECLARE_OPERATOR(SpillSortSinkLocalState) +DECLARE_OPERATOR(LocalExchangeSinkLocalState) +DECLARE_OPERATOR(AggSinkLocalState) +DECLARE_OPERATOR(PartitionedAggSinkLocalState) +DECLARE_OPERATOR(ExchangeSinkLocalState) +DECLARE_OPERATOR(NestedLoopJoinBuildSinkLocalState) +DECLARE_OPERATOR(UnionSinkLocalState) +DECLARE_OPERATOR(MultiCastDataStreamSinkLocalState) +DECLARE_OPERATOR(PartitionSortSinkLocalState) +DECLARE_OPERATOR(SetProbeSinkLocalState) +DECLARE_OPERATOR(SetProbeSinkLocalState) +DECLARE_OPERATOR(SetSinkLocalState) +DECLARE_OPERATOR(SetSinkLocalState) +DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) +DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) + +#undef DECLARE_OPERATOR + +#define DECLARE_OPERATOR(LOCAL_STATE) template class OperatorX; +DECLARE_OPERATOR(HashJoinProbeLocalState) +DECLARE_OPERATOR(OlapScanLocalState) +DECLARE_OPERATOR(GroupCommitLocalState) +DECLARE_OPERATOR(JDBCScanLocalState) +DECLARE_OPERATOR(FileScanLocalState) +DECLARE_OPERATOR(EsScanLocalState) +DECLARE_OPERATOR(AnalyticLocalState) +DECLARE_OPERATOR(SortLocalState) +DECLARE_OPERATOR(SpillSortLocalState) +DECLARE_OPERATOR(AggLocalState) +DECLARE_OPERATOR(PartitionedAggLocalState) +DECLARE_OPERATOR(TableFunctionLocalState) +DECLARE_OPERATOR(ExchangeLocalState) +DECLARE_OPERATOR(RepeatLocalState) +DECLARE_OPERATOR(NestedLoopJoinProbeLocalState) +DECLARE_OPERATOR(AssertNumRowsLocalState) +DECLARE_OPERATOR(EmptySetLocalState) +DECLARE_OPERATOR(UnionSourceLocalState) +DECLARE_OPERATOR(MultiCastDataStreamSourceLocalState) +DECLARE_OPERATOR(PartitionSortSourceLocalState) +DECLARE_OPERATOR(SetSourceLocalState) +DECLARE_OPERATOR(SetSourceLocalState) +DECLARE_OPERATOR(DataGenLocalState) +DECLARE_OPERATOR(SchemaScanLocalState) +DECLARE_OPERATOR(MetaScanLocalState) +DECLARE_OPERATOR(LocalExchangeSourceLocalState) +DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) + +#undef DECLARE_OPERATOR template class StreamingOperatorX; template class StreamingOperatorX; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 0863550dc192e4..48f8a2d1836574 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -39,7 +39,6 @@ #include "vec/runtime/vdata_stream_recvr.h" namespace doris { -class DataSink; class RowDescriptor; class RuntimeState; class TDataSink; @@ -82,7 +81,7 @@ struct LocalSinkStateInfo { class OperatorBase { public: - explicit OperatorBase() : _child_x(nullptr), _is_closed(false) {} + explicit OperatorBase() : _child(nullptr), _is_closed(false) {} virtual ~OperatorBase() = default; virtual bool is_sink() const { return false; } @@ -98,7 +97,7 @@ class OperatorBase { [[nodiscard]] virtual Status close(RuntimeState* state); [[nodiscard]] virtual Status set_child(OperatorPtr child) { - _child_x = std::move(child); + _child = std::move(child); return Status::OK(); } @@ -108,7 +107,7 @@ class OperatorBase { virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); } [[nodiscard]] virtual bool require_data_distribution() const { return false; } - OperatorPtr child_x() { return _child_x; } + OperatorPtr child() { return _child; } [[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; } void set_followed_by_shuffled_join(bool followed_by_shuffled_join) { _followed_by_shuffled_join = followed_by_shuffled_join; @@ -116,7 +115,7 @@ class OperatorBase { [[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; } protected: - OperatorPtr _child_x = nullptr; + OperatorPtr _child = nullptr; bool _is_closed; bool _followed_by_shuffled_join = false; @@ -645,15 +644,15 @@ class OperatorXBase : public OperatorBase { } [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual DataDistribution required_data_distribution() const { - return _child_x && _child_x->ignore_data_distribution() && !is_source() + return _child && _child->ignore_data_distribution() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); } [[nodiscard]] virtual bool ignore_data_distribution() const { - return _child_x ? _child_x->ignore_data_distribution() : _ignore_data_distribution; + return _child ? _child->ignore_data_distribution() : _ignore_data_distribution; } [[nodiscard]] bool ignore_data_hash_distribution() const { - return _child_x ? _child_x->ignore_data_hash_distribution() : _ignore_data_distribution; + return _child ? _child->ignore_data_hash_distribution() : _ignore_data_distribution; } [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } void set_ignore_data_distribution() { _ignore_data_distribution = true; } @@ -708,7 +707,7 @@ class OperatorXBase : public OperatorBase { return reinterpret_cast(*this); } - [[nodiscard]] OperatorPtr get_child() { return _child_x; } + [[nodiscard]] OperatorPtr get_child() { return _child; } [[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } [[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; } diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 80808185fa8980..94c51e160da2a2 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -117,7 +117,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo ADD_COUNTER(_profile, "SortedPartitionInputRows", TUnit::UNIT); _partition_sort_info = std::make_shared( &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, + p._child->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, p._top_n_algorithm, p._topn_phase); RETURN_IF_ERROR(_init_hash_method()); return Status::OK(); @@ -156,8 +156,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st Status PartitionSortSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child->row_desc())); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_partition_expr_ctxs, state)); return Status::OK(); @@ -175,7 +175,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._value_places.push_back(_pool->add(new PartitionBlocks( local_state._partition_sort_info, local_state._value_places.empty()))); } - local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc()); + local_state._value_places[0]->append_whole_block(input_block, _child->row_desc()); } else { //just simply use partition num to check //if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 448d3239949a8a..469716b7a22182 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -141,8 +141,8 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* s } _agg_sink_operator->set_dests_id(DataSinkOperatorX::dests_id()); - RETURN_IF_ERROR(_agg_sink_operator->set_child( - DataSinkOperatorX::_child_x)); + RETURN_IF_ERROR( + _agg_sink_operator->set_child(DataSinkOperatorX::_child)); return _agg_sink_operator->init(tnode, state); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 6529d1eb6540c5..018d63a6deebb1 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -525,15 +525,15 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt } Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) { - // to avoid open _child_x twice - auto child_x = std::move(_child_x); + // to avoid open _child twice + auto child = std::move(_child); RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); - RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x)); + RETURN_IF_ERROR(_inner_probe_operator->set_child(child)); DCHECK(_build_side_child != nullptr); _inner_probe_operator->set_build_side_child(_build_side_child); RETURN_IF_ERROR(_inner_probe_operator->open(state)); - _child_x = std::move(child_x); - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); + _child = std::move(child); + RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); return Status::OK(); } @@ -820,8 +820,8 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori return _revoke_memory(state); } - RETURN_IF_ERROR(_child_x->get_block_after_projects(state, local_state._child_block.get(), - &local_state._child_eos)); + RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), + &local_state._child_eos)); if (need_to_spill && local_state._child_eos) { RETURN_IF_ERROR(local_state.finish_spilling(0)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 7c29fdc6ed08dd..a7297be493f804 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -102,7 +102,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) { auto& p = _parent->cast(); _shared_state->inner_shared_state->hash_table_variants.reset(); - auto row_desc = p._child_x->row_desc(); + auto row_desc = p._child->row_desc(); const auto num_slots = row_desc.num_slots(); vectorized::Block build_block; auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state(); @@ -426,8 +426,8 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(JoinBuildSinkOperatorX::open(state)); - RETURN_IF_ERROR(_inner_sink_operator->set_child(_child_x)); - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); + RETURN_IF_ERROR(_inner_sink_operator->set_child(_child)); + RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); return _inner_sink_operator->open(state); } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index b6761186c82fb4..d355d99c2e352f 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -59,7 +59,7 @@ Status RepeatOperatorX::open(RuntimeState* state) { if (_output_tuple_desc == nullptr) { return Status::InternalError("Failed to get tuple descriptor."); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child->row_desc())); for (const auto& slot_desc : _output_tuple_desc->slots()) { _output_slots.push_back(slot_desc); } @@ -211,7 +211,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp int size = _repeat_id_list.size(); if (_repeat_id_idx >= size) { _intermediate_block->clear(); - _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); _repeat_id_idx = 0; } } else if (local_state._expr_ctxs.empty()) { @@ -225,7 +225,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp RETURN_IF_ERROR( local_state.add_grouping_id_column(rows, cur_col, columns, repeat_id_idx)); } - _child_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); } RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, output_block->columns())); diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index bd4b7481aac240..955f956f60d6fe 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -57,7 +57,7 @@ Status SetProbeSinkOperatorX::init(const TPlanNode& tnode, Runtime template Status SetProbeSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX>::open(state)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc())); return vectorized::VExpr::open(_child_exprs, state); } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 3b3ed2f6a2cabd..ab53f5358c2a91 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -111,7 +111,7 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX _partition_exprs; - using OperatorBase::_child_x; + using OperatorBase::_child; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 9cebcf8611edc0..38667293d4854b 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -209,7 +209,7 @@ Status SetSinkOperatorX::init(const TPlanNode& tnode, RuntimeState template Status SetSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc())); return vectorized::VExpr::open(_child_exprs, state); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 48fd7f400dd5b7..1c08eddc141f2e 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -111,7 +111,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX _partition_exprs; - using OperatorBase::_child_x; + using OperatorBase::_child; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index bb7c38d2b709bb..b07942b9ab1c05 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -46,19 +46,19 @@ Status SortSinkLocalState::open(RuntimeState* state) { case TSortAlgorithm::HEAP_SORT: { _shared_state->sorter = vectorized::HeapSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc()); + p._child->row_desc()); break; } case TSortAlgorithm::TOPN_SORT: { _shared_state->sorter = vectorized::TopNSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc(), state, _profile); + p._child->row_desc(), state, _profile); break; } case TSortAlgorithm::FULL_SORT: { _shared_state->sorter = vectorized::FullSorter::create_unique( _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, - p._child_x->row_desc(), state, _profile); + p._child->row_desc(), state, _profile); break; } default: { @@ -108,7 +108,7 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { Status SortSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); return _vsort_exec_exprs.open(state); } diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 17c936846e5c56..02a99e183c852e 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -42,9 +42,9 @@ Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { Status SortSourceOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); - // spill sort _child_x may be nullptr. - if (_child_x) { - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); + // spill sort _child may be nullptr. + if (_child) { + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); } return Status::OK(); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 5f767f2b6e3ab8..4bf1ab04efb628 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -120,7 +120,7 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) _name = "SPILL_SORT_SINK_OPERATOR"; _sort_sink_operator->set_dests_id(DataSinkOperatorX::dests_id()); - RETURN_IF_ERROR(_sort_sink_operator->set_child(DataSinkOperatorX::_child_x)); + RETURN_IF_ERROR(_sort_sink_operator->set_child(DataSinkOperatorX::_child)); return _sort_sink_operator->init(tnode, state); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 59e11583f003c2..dfbe42c637ea56 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1182,7 +1182,7 @@ Status StreamingAggOperatorX::open(RuntimeState* state) { _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); int j = _probe_expr_ctxs.size(); for (int i = 0; i < j; ++i) { @@ -1197,7 +1197,7 @@ Status StreamingAggOperatorX::open(RuntimeState* state) { SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( - state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc)); + state, _child->row_desc(), intermediate_slot_desc, output_slot_desc)); _aggregate_evaluators[i]->set_version(state->be_exec_version()); } @@ -1295,7 +1295,7 @@ Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_bl if (in_block->rows() > 0) { RETURN_IF_ERROR(local_state.do_pre_agg(in_block, local_state._pre_aggregated_block.get())); } - in_block->clear_column_data(_child_x->row_desc().num_materialized_slots()); + in_block->clear_column_data(_child->row_desc().num_materialized_slots()); return Status::OK(); } diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 02f61aa8fa94ea..ff9dfe632faec6 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -215,7 +215,7 @@ void TableFunctionLocalState::process_next_child_row() { } _child_block->clear_column_data(_parent->cast() - ._child_x->row_desc() + ._child->row_desc() .num_materialized_slots()); _cur_child_offset = -1; return; @@ -285,7 +285,7 @@ Status TableFunctionOperatorX::open(doris::RuntimeState* state) { } // get all input slots - for (const auto& child_tuple_desc : _child_x->row_desc().tuple_descriptors()) { + for (const auto& child_tuple_desc : _child->row_desc().tuple_descriptors()) { for (const auto& child_slot_desc : child_tuple_desc->slots()) { _child_slots.push_back(child_slot_desc); } diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 06f301bc75ba40..288fc131037fab 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -74,7 +74,7 @@ Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { Status UnionSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child_x->row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc())); RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _row_descriptor)); // open const expr lists. RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state)); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index f5f3155b2d3d4d..19c37f3649bcc7 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -71,7 +71,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets Status LocalExchangeSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::open(state)); if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); + RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc())); RETURN_IF_ERROR(_partitioner->open(state)); } diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index ad23cb96aef6fe..c0da5c8120c1e9 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -72,10 +72,10 @@ class LocalExchangeSourceOperatorX final : public OperatorXintermediate_row_desc(); + return _child->intermediate_row_desc(); } - RowDescriptor& row_descriptor() override { return _child_x->row_descriptor(); } - const RowDescriptor& row_desc() const override { return _child_x->row_desc(); } + RowDescriptor& row_descriptor() override { return _child->row_descriptor(); } + const RowDescriptor& row_desc() const override { return _child->row_desc(); } Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 38a99dd66d421f..a489273b68d129 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -706,7 +706,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( const std::map& bucket_seq_to_instance_idx, const std::map& shuffle_idx_to_instance_idx, const bool ignore_data_hash_distribution) { - auto& operator_xs = cur_pipe->operators(); + auto& operators = cur_pipe->operators(); const auto downstream_pipeline_id = cur_pipe->id(); auto local_exchange_id = next_operator_id(); // 1. Create a new pipeline with local exchange sink. @@ -717,8 +717,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = operator_xs.size() > idx - ? operator_xs[idx]->followed_by_shuffled_join() + const bool followed_by_shuffled_join = operators.size() > idx + ? operators[idx]->followed_by_shuffled_join() : cur_pipe->sink()->followed_by_shuffled_join(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && @@ -790,7 +790,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( } break; case ExchangeType::LOCAL_MERGE_SORT: { - auto child_op = cur_pipe->sink()->child_x(); + auto child_op = cur_pipe->sink()->child(); auto sort_source = std::dynamic_pointer_cast(child_op); if (!sort_source) { return Status::InternalError( @@ -825,21 +825,21 @@ Status PipelineFragmentContext::_add_local_exchange_impl( // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. // 3.1 Initialize new pipeline's operator list. - std::copy(operator_xs.begin(), operator_xs.begin() + idx, + std::copy(operators.begin(), operators.begin() + idx, std::inserter(new_pip->operators(), new_pip->operators().end())); // 3.2 Erase unused operators in previous pipeline. - operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); + operators.erase(operators.begin(), operators.begin() + idx); // 4. Initialize LocalExchangeSource and insert it into this pipeline. OperatorPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back())); RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type)); - if (!operator_xs.empty()) { - RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); + if (!operators.empty()) { + RETURN_IF_ERROR(operators.front()->set_child(source_op)); } - operator_xs.insert(operator_xs.begin(), source_op); + operators.insert(operators.begin(), source_op); shared_state->create_dependencies(local_exchange_id); @@ -896,8 +896,8 @@ Status PipelineFragmentContext::_add_local_exchange( } *do_local_exchange = true; - auto& operator_xs = cur_pipe->operators(); - auto total_op_num = operator_xs.size(); + auto& operators = cur_pipe->operators(); + auto total_op_num = operators.size(); auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); RETURN_IF_ERROR(_add_local_exchange_impl( idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets, @@ -1653,8 +1653,8 @@ void PipelineFragmentContext::_close_fragment_instance() { } if (_query_ctx->enable_profile()) { - _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile_x(), - collect_realtime_load_channel_profile_x()); + _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(), + collect_realtime_load_channel_profile()); } // all submitted tasks done @@ -1724,7 +1724,7 @@ std::string PipelineFragmentContext::debug_string() { } std::vector> -PipelineFragmentContext::collect_realtime_profile_x() const { +PipelineFragmentContext::collect_realtime_profile() const { std::vector> res; // we do not have mutex to protect pipeline_id_to_profile @@ -1749,7 +1749,7 @@ PipelineFragmentContext::collect_realtime_profile_x() const { } std::shared_ptr -PipelineFragmentContext::collect_realtime_load_channel_profile_x() const { +PipelineFragmentContext::collect_realtime_load_channel_profile() const { // we do not have mutex to protect pipeline_id_to_profile // so we need to make sure this funciton is invoked after fragment context // has already been prepared. diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 75f3f22c68131c..f46835e95e0647 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -69,8 +69,8 @@ class PipelineFragmentContext : public TaskExecutionContext { ~PipelineFragmentContext(); - std::vector> collect_realtime_profile_x() const; - std::shared_ptr collect_realtime_load_channel_profile_x() const; + std::vector> collect_realtime_profile() const; + std::shared_ptr collect_realtime_load_channel_profile() const; bool is_timeout(timespec now) const; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 97aba2cae286c8..b9430d3899b8d3 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -401,7 +401,7 @@ QueryContext::_collect_realtime_query_profile() const { continue; } - auto profile = fragment_ctx->collect_realtime_profile_x(); + auto profile = fragment_ctx->collect_realtime_profile(); if (profile.empty()) { std::string err_msg = fmt::format(