Skip to content

Commit

Permalink
[refactor](pipeline) Refactor logics (#40576)
Browse files Browse the repository at this point in the history
Gabriel39 authored Sep 10, 2024
1 parent ebc4600 commit cd902b6
Showing 35 changed files with 162 additions and 164 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -775,7 +775,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) {
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc()));
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
@@ -790,7 +790,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc(),
state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child_x
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -234,11 +234,11 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
}
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_buffered_tuple_id);
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
if (!_partition_by_eq_expr_ctxs.empty()) {
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
@@ -562,13 +562,13 @@ Status AnalyticLocalState::close(RuntimeState* state) {

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
for (size_t i = 0; i < _agg_functions.size(); ++i) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_agg_functions[i]->set_version(state->be_exec_version());
_change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
Original file line number Diff line number Diff line change
@@ -374,7 +374,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
@@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

4 changes: 2 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -648,10 +648,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}

DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
if (_child_x && _enable_local_merge_sort) {
if (_child && _enable_local_merge_sort) {
// SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
// SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child);
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
@@ -488,7 +488,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}

@@ -505,7 +505,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

if (local_state._build_side_mutable_block.empty()) {
auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename(
_child_x->row_desc());
_child->row_desc());
tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false));
local_state._build_col_ids.resize(_build_expr_ctxs.size());
RETURN_IF_ERROR(local_state._do_evaluate(tmp_build_block, local_state._build_expr_ctxs,
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
@@ -132,7 +132,7 @@ class HashJoinBuildSinkOperatorX final
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
return _child_x->ignore_data_distribution()
return _child->ignore_data_distribution()
? DataDistribution(ExchangeType::PASS_TO_ONE)
: DataDistribution(ExchangeType::NOOP);
}
8 changes: 4 additions & 4 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
@@ -276,7 +276,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
/// increase the output rows count(just same as `_probe_block`'s rows count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos,
&local_state._probe_block, false));
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
local_state._probe_block.clear_column_data(_child->row_desc().num_materialized_slots());
return Status::OK();
}

@@ -597,7 +597,7 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) {
}
}
};
init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags);
init_output_slots_flags(_child->row_desc().tuple_descriptors(), _left_output_slot_flags);
init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
_right_output_slot_flags);
// _other_join_conjuncts are evaluated in the context of the rows produced by this node
@@ -609,12 +609,12 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));
DCHECK(_build_side_child != nullptr);
// right table data types
_right_table_data_types =
vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc());
_left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc());
_left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child->row_desc());
_right_table_column_names =
vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc());

4 changes: 2 additions & 2 deletions be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
@@ -85,12 +85,12 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
}

Status set_child(OperatorPtr child) override {
if (OperatorX<LocalStateType>::_child_x && _build_side_child == nullptr) {
if (OperatorX<LocalStateType>::_child && _build_side_child == nullptr) {
// when there already (probe) child, others is build child.
set_build_side_child(child);
} else {
// first child which is probe side is in this pipeline
OperatorX<LocalStateType>::_child_x = std::move(child);
OperatorX<LocalStateType>::_child = std::move(child);
}
return Status::OK();
}
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
@@ -109,14 +109,14 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta

Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::open(state));
int num_build_tuples = _child_x->row_desc().tuple_descriptors().size();
int num_build_tuples = _child->row_desc().tuple_descriptors().size();

for (int i = 0; i < num_build_tuples; ++i) {
TupleDescriptor* build_tuple_desc = _child_x->row_desc().tuple_descriptors()[i];
TupleDescriptor* build_tuple_desc = _child->row_desc().tuple_descriptors()[i];
auto tuple_idx = _row_descriptor.get_tuple_idx(build_tuple_desc->id());
RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx);
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_filter_src_expr_ctxs, state);
}

4 changes: 2 additions & 2 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
@@ -76,8 +76,8 @@ class NestedLoopJoinBuildSinkOperatorX final
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST)
: DataDistribution(ExchangeType::NOOP);
return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST)
: DataDistribution(ExchangeType::NOOP);
}

private:
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
@@ -450,7 +450,7 @@ Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) {
for (auto& conjunct : _join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}
_num_probe_side_columns = _child_x->row_desc().num_materialized_slots();
_num_probe_side_columns = _child->row_desc().num_materialized_slots();
_num_build_side_columns = _build_side_child->row_desc().num_materialized_slots();
return vectorized::VExpr::open(_join_conjuncts, state);
}
139 changes: 69 additions & 70 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
@@ -116,12 +116,12 @@ std::string PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
}

DataDistribution DataSinkOperatorXBase::required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
return _child && _child->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
const RowDescriptor& OperatorBase::row_desc() const {
return _child_x->row_desc();
return _child->row_desc();
}

template <typename SharedStateArg>
@@ -220,15 +220,15 @@ Status OperatorXBase::open(RuntimeState* state) {
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->open(state));
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->open(state));
}
return Status::OK();
}

Status OperatorXBase::close(RuntimeState* state) {
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->close(state));
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->close(state));
}
auto result = state->get_local_state_result(operator_id());
if (!result) {
@@ -572,8 +572,7 @@ Status PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
RETURN_IF_ERROR(
OperatorX<LocalStateType>::_child_x->get_block_after_projects(state, block, eos));
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(state, block, eos));
return pull(state, block, eos);
}

@@ -583,8 +582,8 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
auto& local_state = get_local_state(state);
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data(
OperatorX<LocalStateType>::_child_x->row_desc().num_materialized_slots());
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(
OperatorX<LocalStateType>::_child->row_desc().num_materialized_slots());
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(
state, local_state._child_block.get(), &local_state._child_eos));
*eos = local_state._child_eos;
if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
@@ -668,66 +667,66 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s
return Base::close(state, exec_status);
}

#define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(MemoryScratchSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
DECLARE_OPERATOR_X(HiveTableSinkLocalState)
DECLARE_OPERATOR_X(IcebergTableSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(SpillSortSinkLocalState)
DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
DECLARE_OPERATOR_X(AggSinkLocalState)
DECLARE_OPERATOR_X(PartitionedAggSinkLocalState)
DECLARE_OPERATOR_X(ExchangeSinkLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(UnionSinkLocalState)
DECLARE_OPERATOR_X(MultiCastDataStreamSinkLocalState)
DECLARE_OPERATOR_X(PartitionSortSinkLocalState)
DECLARE_OPERATOR_X(SetProbeSinkLocalState<true>)
DECLARE_OPERATOR_X(SetProbeSinkLocalState<false>)
DECLARE_OPERATOR_X(SetSinkLocalState<true>)
DECLARE_OPERATOR_X(SetSinkLocalState<false>)
DECLARE_OPERATOR_X(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR_X(GroupCommitBlockSinkLocalState)

#undef DECLARE_OPERATOR_X

#define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinProbeLocalState)
DECLARE_OPERATOR_X(OlapScanLocalState)
DECLARE_OPERATOR_X(GroupCommitLocalState)
DECLARE_OPERATOR_X(JDBCScanLocalState)
DECLARE_OPERATOR_X(FileScanLocalState)
DECLARE_OPERATOR_X(EsScanLocalState)
DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)
DECLARE_OPERATOR_X(SpillSortLocalState)
DECLARE_OPERATOR_X(AggLocalState)
DECLARE_OPERATOR_X(PartitionedAggLocalState)
DECLARE_OPERATOR_X(TableFunctionLocalState)
DECLARE_OPERATOR_X(ExchangeLocalState)
DECLARE_OPERATOR_X(RepeatLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR_X(AssertNumRowsLocalState)
DECLARE_OPERATOR_X(EmptySetLocalState)
DECLARE_OPERATOR_X(UnionSourceLocalState)
DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState)
DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
DECLARE_OPERATOR_X(SetSourceLocalState<true>)
DECLARE_OPERATOR_X(SetSourceLocalState<false>)
DECLARE_OPERATOR_X(DataGenLocalState)
DECLARE_OPERATOR_X(SchemaScanLocalState)
DECLARE_OPERATOR_X(MetaScanLocalState)
DECLARE_OPERATOR_X(LocalExchangeSourceLocalState)
DECLARE_OPERATOR_X(PartitionedHashJoinProbeLocalState)

#undef DECLARE_OPERATOR_X
#define DECLARE_OPERATOR(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR(ResultSinkLocalState)
DECLARE_OPERATOR(JdbcTableSinkLocalState)
DECLARE_OPERATOR(MemoryScratchSinkLocalState)
DECLARE_OPERATOR(ResultFileSinkLocalState)
DECLARE_OPERATOR(OlapTableSinkLocalState)
DECLARE_OPERATOR(OlapTableSinkV2LocalState)
DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
DECLARE_OPERATOR(SortSinkLocalState)
DECLARE_OPERATOR(SpillSortSinkLocalState)
DECLARE_OPERATOR(LocalExchangeSinkLocalState)
DECLARE_OPERATOR(AggSinkLocalState)
DECLARE_OPERATOR(PartitionedAggSinkLocalState)
DECLARE_OPERATOR(ExchangeSinkLocalState)
DECLARE_OPERATOR(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR(UnionSinkLocalState)
DECLARE_OPERATOR(MultiCastDataStreamSinkLocalState)
DECLARE_OPERATOR(PartitionSortSinkLocalState)
DECLARE_OPERATOR(SetProbeSinkLocalState<true>)
DECLARE_OPERATOR(SetProbeSinkLocalState<false>)
DECLARE_OPERATOR(SetSinkLocalState<true>)
DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)

#undef DECLARE_OPERATOR

#define DECLARE_OPERATOR(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
DECLARE_OPERATOR(HashJoinProbeLocalState)
DECLARE_OPERATOR(OlapScanLocalState)
DECLARE_OPERATOR(GroupCommitLocalState)
DECLARE_OPERATOR(JDBCScanLocalState)
DECLARE_OPERATOR(FileScanLocalState)
DECLARE_OPERATOR(EsScanLocalState)
DECLARE_OPERATOR(AnalyticLocalState)
DECLARE_OPERATOR(SortLocalState)
DECLARE_OPERATOR(SpillSortLocalState)
DECLARE_OPERATOR(AggLocalState)
DECLARE_OPERATOR(PartitionedAggLocalState)
DECLARE_OPERATOR(TableFunctionLocalState)
DECLARE_OPERATOR(ExchangeLocalState)
DECLARE_OPERATOR(RepeatLocalState)
DECLARE_OPERATOR(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR(AssertNumRowsLocalState)
DECLARE_OPERATOR(EmptySetLocalState)
DECLARE_OPERATOR(UnionSourceLocalState)
DECLARE_OPERATOR(MultiCastDataStreamSourceLocalState)
DECLARE_OPERATOR(PartitionSortSourceLocalState)
DECLARE_OPERATOR(SetSourceLocalState<true>)
DECLARE_OPERATOR(SetSourceLocalState<false>)
DECLARE_OPERATOR(DataGenLocalState)
DECLARE_OPERATOR(SchemaScanLocalState)
DECLARE_OPERATOR(MetaScanLocalState)
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)

#undef DECLARE_OPERATOR

template class StreamingOperatorX<AssertNumRowsLocalState>;
template class StreamingOperatorX<SelectLocalState>;
17 changes: 8 additions & 9 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
@@ -39,7 +39,6 @@
#include "vec/runtime/vdata_stream_recvr.h"

namespace doris {
class DataSink;
class RowDescriptor;
class RuntimeState;
class TDataSink;
@@ -82,7 +81,7 @@ struct LocalSinkStateInfo {

class OperatorBase {
public:
explicit OperatorBase() : _child_x(nullptr), _is_closed(false) {}
explicit OperatorBase() : _child(nullptr), _is_closed(false) {}
virtual ~OperatorBase() = default;

virtual bool is_sink() const { return false; }
@@ -98,7 +97,7 @@ class OperatorBase {
[[nodiscard]] virtual Status close(RuntimeState* state);

[[nodiscard]] virtual Status set_child(OperatorPtr child) {
_child_x = std::move(child);
_child = std::move(child);
return Status::OK();
}

@@ -108,15 +107,15 @@ class OperatorBase {

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
OperatorPtr child_x() { return _child_x; }
OperatorPtr child() { return _child; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
}
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
OperatorPtr _child_x = nullptr;
OperatorPtr _child = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
@@ -645,15 +644,15 @@ class OperatorXBase : public OperatorBase {
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
return _child && _child->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
[[nodiscard]] virtual bool ignore_data_distribution() const {
return _child_x ? _child_x->ignore_data_distribution() : _ignore_data_distribution;
return _child ? _child->ignore_data_distribution() : _ignore_data_distribution;
}
[[nodiscard]] bool ignore_data_hash_distribution() const {
return _child_x ? _child_x->ignore_data_hash_distribution() : _ignore_data_distribution;
return _child ? _child->ignore_data_hash_distribution() : _ignore_data_distribution;
}
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; }
void set_ignore_data_distribution() { _ignore_data_distribution = true; }
@@ -708,7 +707,7 @@ class OperatorXBase : public OperatorBase {
return reinterpret_cast<const TARGET&>(*this);
}

[[nodiscard]] OperatorPtr get_child() { return _child_x; }
[[nodiscard]] OperatorPtr get_child() { return _child; }

[[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
[[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; }
8 changes: 4 additions & 4 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
ADD_COUNTER(_profile, "SortedPartitionInputRows", TUnit::UNIT);
_partition_sort_info = std::make_shared<PartitionSortInfo>(
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
p._child->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
p._top_n_algorithm, p._topn_phase);
RETURN_IF_ERROR(_init_hash_method());
return Status::OK();
@@ -156,8 +156,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st

Status PartitionSortSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::open(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child->row_desc()));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_expr_ctxs, state));
return Status::OK();
@@ -175,7 +175,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._value_places.push_back(_pool->add(new PartitionBlocks(
local_state._partition_sort_info, local_state._value_places.empty())));
}
local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc());
local_state._value_places[0]->append_whole_block(input_block, _child->row_desc());
} else {
//just simply use partition num to check
//if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded.
Original file line number Diff line number Diff line change
@@ -141,8 +141,8 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* s
}

_agg_sink_operator->set_dests_id(DataSinkOperatorX<PartitionedAggSinkLocalState>::dests_id());
RETURN_IF_ERROR(_agg_sink_operator->set_child(
DataSinkOperatorX<PartitionedAggSinkLocalState>::_child_x));
RETURN_IF_ERROR(
_agg_sink_operator->set_child(DataSinkOperatorX<PartitionedAggSinkLocalState>::_child));
return _agg_sink_operator->init(tnode, state);
}

14 changes: 7 additions & 7 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
@@ -525,15 +525,15 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt
}

Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
// to avoid open _child_x twice
auto child_x = std::move(_child_x);
// to avoid open _child twice
auto child = std::move(_child);
RETURN_IF_ERROR(JoinProbeOperatorX::open(state));
RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x));
RETURN_IF_ERROR(_inner_probe_operator->set_child(child));
DCHECK(_build_side_child != nullptr);
_inner_probe_operator->set_build_side_child(_build_side_child);
RETURN_IF_ERROR(_inner_probe_operator->open(state));
_child_x = std::move(child_x);
RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
_child = std::move(child);
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
return Status::OK();
}
@@ -820,8 +820,8 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
return _revoke_memory(state);
}

RETURN_IF_ERROR(_child_x->get_block_after_projects(state, local_state._child_block.get(),
&local_state._child_eos));
RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(),
&local_state._child_eos));

if (need_to_spill && local_state._child_eos) {
RETURN_IF_ERROR(local_state.finish_spilling(0));
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) {
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
auto row_desc = p._child->row_desc();
const auto num_slots = row_desc.num_slots();
vectorized::Block build_block;
auto inner_sink_state_ = _shared_state->inner_runtime_state->get_sink_local_state();
@@ -426,8 +426,8 @@ Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta

Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::open(state));
RETURN_IF_ERROR(_inner_sink_operator->set_child(_child_x));
RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
RETURN_IF_ERROR(_inner_sink_operator->set_child(_child));
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
return _inner_sink_operator->open(state);
}
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/repeat_operator.cpp
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ Status RepeatOperatorX::open(RuntimeState* state) {
if (_output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_expr_ctxs, state, _child->row_desc()));
for (const auto& slot_desc : _output_tuple_desc->slots()) {
_output_slots.push_back(slot_desc);
}
@@ -211,7 +211,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp
int size = _repeat_id_list.size();
if (_repeat_id_idx >= size) {
_intermediate_block->clear();
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
_child_block.clear_column_data(_child->row_desc().num_materialized_slots());
_repeat_id_idx = 0;
}
} else if (local_state._expr_ctxs.empty()) {
@@ -225,7 +225,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp
RETURN_IF_ERROR(
local_state.add_grouping_id_column(rows, cur_col, columns, repeat_id_idx));
}
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
_child_block.clear_column_data(_child->row_desc().num_materialized_slots());
}
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
output_block->columns()));
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_probe_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ Status SetProbeSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, Runtime
template <bool is_intersect>
Status SetProbeSinkOperatorX<is_intersect>::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc()));
return vectorized::VExpr::open(_child_exprs, state);
}

2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_probe_sink_operator.h
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX<SetProbeSinkLocalSt
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child_x;
using OperatorBase::_child;
};

} // namespace pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -209,7 +209,7 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state, _child->row_desc()));
return vectorized::VExpr::open(_child_exprs, state);
}

2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child_x;
using OperatorBase::_child;
};

} // namespace pipeline
8 changes: 4 additions & 4 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -46,19 +46,19 @@ Status SortSinkLocalState::open(RuntimeState* state) {
case TSortAlgorithm::HEAP_SORT: {
_shared_state->sorter = vectorized::HeapSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc());
p._child->row_desc());
break;
}
case TSortAlgorithm::TOPN_SORT: {
_shared_state->sorter = vectorized::TopNSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile);
p._child->row_desc(), state, _profile);
break;
}
case TSortAlgorithm::FULL_SORT: {
_shared_state->sorter = vectorized::FullSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile);
p._child->row_desc(), state, _profile);
break;
}
default: {
@@ -108,7 +108,7 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {

Status SortSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::open(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
return _vsort_exec_exprs.open(state);
}

6 changes: 3 additions & 3 deletions be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
@@ -42,9 +42,9 @@ Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {

Status SortSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
// spill sort _child_x may be nullptr.
if (_child_x) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor));
// spill sort _child may be nullptr.
if (_child) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
}
return Status::OK();
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
_name = "SPILL_SORT_SINK_OPERATOR";

_sort_sink_operator->set_dests_id(DataSinkOperatorX<LocalStateType>::dests_id());
RETURN_IF_ERROR(_sort_sink_operator->set_child(DataSinkOperatorX<LocalStateType>::_child_x));
RETURN_IF_ERROR(_sort_sink_operator->set_child(DataSinkOperatorX<LocalStateType>::_child));
return _sort_sink_operator->init(tnode, state);
}

6 changes: 3 additions & 3 deletions be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
@@ -1182,7 +1182,7 @@ Status StreamingAggOperatorX::open(RuntimeState* state) {
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
@@ -1197,7 +1197,7 @@ Status StreamingAggOperatorX::open(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

@@ -1295,7 +1295,7 @@ Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block* in_bl
if (in_block->rows() > 0) {
RETURN_IF_ERROR(local_state.do_pre_agg(in_block, local_state._pre_aggregated_block.get()));
}
in_block->clear_column_data(_child_x->row_desc().num_materialized_slots());
in_block->clear_column_data(_child->row_desc().num_materialized_slots());
return Status::OK();
}

4 changes: 2 additions & 2 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
@@ -215,7 +215,7 @@ void TableFunctionLocalState::process_next_child_row() {
}

_child_block->clear_column_data(_parent->cast<TableFunctionOperatorX>()
._child_x->row_desc()
._child->row_desc()
.num_materialized_slots());
_cur_child_offset = -1;
return;
@@ -285,7 +285,7 @@ Status TableFunctionOperatorX::open(doris::RuntimeState* state) {
}

// get all input slots
for (const auto& child_tuple_desc : _child_x->row_desc().tuple_descriptors()) {
for (const auto& child_tuple_desc : _child->row_desc().tuple_descriptors()) {
for (const auto& child_slot_desc : child_tuple_desc->slots()) {
_child_slots.push_back(child_slot_desc);
}
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/union_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {

Status UnionSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state, _child->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr, _row_descriptor));
// open const expr lists.
RETURN_IF_ERROR(vectorized::VExpr::open(_const_expr, state));
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
Status LocalExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::open(state));
if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) {
RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
}

Original file line number Diff line number Diff line change
@@ -72,10 +72,10 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL
}
Status open(RuntimeState* state) override { return Status::OK(); }
const RowDescriptor& intermediate_row_desc() const override {
return _child_x->intermediate_row_desc();
return _child->intermediate_row_desc();
}
RowDescriptor& row_descriptor() override { return _child_x->row_descriptor(); }
const RowDescriptor& row_desc() const override { return _child_x->row_desc(); }
RowDescriptor& row_descriptor() override { return _child->row_descriptor(); }
const RowDescriptor& row_desc() const override { return _child->row_desc(); }

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

30 changes: 15 additions & 15 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
@@ -706,7 +706,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
const std::map<int, int>& bucket_seq_to_instance_idx,
const std::map<int, int>& shuffle_idx_to_instance_idx,
const bool ignore_data_hash_distribution) {
auto& operator_xs = cur_pipe->operators();
auto& operators = cur_pipe->operators();
const auto downstream_pipeline_id = cur_pipe->id();
auto local_exchange_id = next_operator_id();
// 1. Create a new pipeline with local exchange sink.
@@ -717,8 +717,8 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
*/
const bool followed_by_shuffled_join = operator_xs.size() > idx
? operator_xs[idx]->followed_by_shuffled_join()
const bool followed_by_shuffled_join = operators.size() > idx
? operators[idx]->followed_by_shuffled_join()
: cur_pipe->sink()->followed_by_shuffled_join();
const bool should_disable_bucket_shuffle =
bucket_seq_to_instance_idx.empty() &&
@@ -790,7 +790,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
break;
case ExchangeType::LOCAL_MERGE_SORT: {
auto child_op = cur_pipe->sink()->child_x();
auto child_op = cur_pipe->sink()->child();
auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(child_op);
if (!sort_source) {
return Status::InternalError(
@@ -825,21 +825,21 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
// pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].

// 3.1 Initialize new pipeline's operator list.
std::copy(operator_xs.begin(), operator_xs.begin() + idx,
std::copy(operators.begin(), operators.begin() + idx,
std::inserter(new_pip->operators(), new_pip->operators().end()));

// 3.2 Erase unused operators in previous pipeline.
operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
operators.erase(operators.begin(), operators.begin() + idx);

// 4. Initialize LocalExchangeSource and insert it into this pipeline.
OperatorPtr source_op;
source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
if (!operator_xs.empty()) {
RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
if (!operators.empty()) {
RETURN_IF_ERROR(operators.front()->set_child(source_op));
}
operator_xs.insert(operator_xs.begin(), source_op);
operators.insert(operators.begin(), source_op);

shared_state->create_dependencies(local_exchange_id);

@@ -896,8 +896,8 @@ Status PipelineFragmentContext::_add_local_exchange(
}
*do_local_exchange = true;

auto& operator_xs = cur_pipe->operators();
auto total_op_num = operator_xs.size();
auto& operators = cur_pipe->operators();
auto total_op_num = operators.size();
auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
RETURN_IF_ERROR(_add_local_exchange_impl(
idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
@@ -1653,8 +1653,8 @@ void PipelineFragmentContext::_close_fragment_instance() {
}

if (_query_ctx->enable_profile()) {
_query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile_x(),
collect_realtime_load_channel_profile_x());
_query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
collect_realtime_load_channel_profile());
}

// all submitted tasks done
@@ -1724,7 +1724,7 @@ std::string PipelineFragmentContext::debug_string() {
}

std::vector<std::shared_ptr<TRuntimeProfileTree>>
PipelineFragmentContext::collect_realtime_profile_x() const {
PipelineFragmentContext::collect_realtime_profile() const {
std::vector<std::shared_ptr<TRuntimeProfileTree>> res;

// we do not have mutex to protect pipeline_id_to_profile
@@ -1749,7 +1749,7 @@ PipelineFragmentContext::collect_realtime_profile_x() const {
}

std::shared_ptr<TRuntimeProfileTree>
PipelineFragmentContext::collect_realtime_load_channel_profile_x() const {
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
// we do not have mutex to protect pipeline_id_to_profile
// so we need to make sure this funciton is invoked after fragment context
// has already been prepared.
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
@@ -69,8 +69,8 @@ class PipelineFragmentContext : public TaskExecutionContext {

~PipelineFragmentContext();

std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile_x() const;
std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile_x() const;
std::vector<std::shared_ptr<TRuntimeProfileTree>> collect_realtime_profile() const;
std::shared_ptr<TRuntimeProfileTree> collect_realtime_load_channel_profile() const;

bool is_timeout(timespec now) const;

2 changes: 1 addition & 1 deletion be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
@@ -401,7 +401,7 @@ QueryContext::_collect_realtime_query_profile() const {
continue;
}

auto profile = fragment_ctx->collect_realtime_profile_x();
auto profile = fragment_ctx->collect_realtime_profile();

if (profile.empty()) {
std::string err_msg = fmt::format(

0 comments on commit cd902b6

Please sign in to comment.