Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](pipeline) Refactor logics #40576

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) {
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc()));
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -790,7 +790,7 @@ Status AggSinkOperatorX::open(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc(),
state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child_x
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,11 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
}
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_buffered_tuple_id);
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
if (!_partition_by_eq_expr_ctxs.empty()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,13 @@ Status AnalyticLocalState::close(RuntimeState* state) {

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
for (size_t i = 0; i < _agg_functions.size(); ++i) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_agg_functions[i]->set_version(state->be_exec_version());
_change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}

DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
if (_child_x && _enable_local_merge_sort) {
if (_child && _enable_local_merge_sort) {
// SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
// SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child);
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}

Expand All @@ -505,7 +505,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

if (local_state._build_side_mutable_block.empty()) {
auto tmp_build_block = vectorized::VectorizedUtils::create_empty_columnswithtypename(
_child_x->row_desc());
_child->row_desc());
tmp_build_block = *(tmp_build_block.create_same_struct_block(1, false));
local_state._build_col_ids.resize(_build_expr_ctxs.size());
RETURN_IF_ERROR(local_state._do_evaluate(tmp_build_block, local_state._build_expr_ctxs,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class HashJoinBuildSinkOperatorX final
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
return _child_x->ignore_data_distribution()
return _child->ignore_data_distribution()
? DataDistribution(ExchangeType::PASS_TO_ONE)
: DataDistribution(ExchangeType::NOOP);
}
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
/// increase the output rows count(just same as `_probe_block`'s rows count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos,
&local_state._probe_block, false));
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
local_state._probe_block.clear_column_data(_child->row_desc().num_materialized_slots());
return Status::OK();
}

Expand Down Expand Up @@ -597,7 +597,7 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) {
}
}
};
init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags);
init_output_slots_flags(_child->row_desc().tuple_descriptors(), _left_output_slot_flags);
init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
_right_output_slot_flags);
// _other_join_conjuncts are evaluated in the context of the rows produced by this node
Expand All @@ -609,12 +609,12 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));
DCHECK(_build_side_child != nullptr);
// right table data types
_right_table_data_types =
vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc());
_left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc());
_left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child->row_desc());
_right_table_column_names =
vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc());

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
}

Status set_child(OperatorPtr child) override {
if (OperatorX<LocalStateType>::_child_x && _build_side_child == nullptr) {
if (OperatorX<LocalStateType>::_child && _build_side_child == nullptr) {
// when there already (probe) child, others is build child.
set_build_side_child(child);
} else {
// first child which is probe side is in this pipeline
OperatorX<LocalStateType>::_child_x = std::move(child);
OperatorX<LocalStateType>::_child = std::move(child);
}
return Status::OK();
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta

Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::open(state));
int num_build_tuples = _child_x->row_desc().tuple_descriptors().size();
int num_build_tuples = _child->row_desc().tuple_descriptors().size();

for (int i = 0; i < num_build_tuples; ++i) {
TupleDescriptor* build_tuple_desc = _child_x->row_desc().tuple_descriptors()[i];
TupleDescriptor* build_tuple_desc = _child->row_desc().tuple_descriptors()[i];
auto tuple_idx = _row_descriptor.get_tuple_idx(build_tuple_desc->id());
RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx);
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_filter_src_expr_ctxs, state);
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class NestedLoopJoinBuildSinkOperatorX final
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
return _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST)
: DataDistribution(ExchangeType::NOOP);
return _child->ignore_data_distribution() ? DataDistribution(ExchangeType::BROADCAST)
: DataDistribution(ExchangeType::NOOP);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) {
for (auto& conjunct : _join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}
_num_probe_side_columns = _child_x->row_desc().num_materialized_slots();
_num_probe_side_columns = _child->row_desc().num_materialized_slots();
_num_build_side_columns = _build_side_child->row_desc().num_materialized_slots();
return vectorized::VExpr::open(_join_conjuncts, state);
}
Expand Down
139 changes: 69 additions & 70 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ std::string PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
}

DataDistribution DataSinkOperatorXBase::required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
return _child && _child->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
const RowDescriptor& OperatorBase::row_desc() const {
return _child_x->row_desc();
return _child->row_desc();
}

template <typename SharedStateArg>
Expand Down Expand Up @@ -220,15 +220,15 @@ Status OperatorXBase::open(RuntimeState* state) {
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->open(state));
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->open(state));
}
return Status::OK();
}

Status OperatorXBase::close(RuntimeState* state) {
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->close(state));
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->close(state));
}
auto result = state->get_local_state_result(operator_id());
if (!result) {
Expand Down Expand Up @@ -572,8 +572,7 @@ Status PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
RETURN_IF_ERROR(
OperatorX<LocalStateType>::_child_x->get_block_after_projects(state, block, eos));
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(state, block, eos));
return pull(state, block, eos);
}

Expand All @@ -583,8 +582,8 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
auto& local_state = get_local_state(state);
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data(
OperatorX<LocalStateType>::_child_x->row_desc().num_materialized_slots());
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(
OperatorX<LocalStateType>::_child->row_desc().num_materialized_slots());
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(
state, local_state._child_block.get(), &local_state._child_eos));
*eos = local_state._child_eos;
if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
Expand Down Expand Up @@ -668,66 +667,66 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s
return Base::close(state, exec_status);
}

#define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(MemoryScratchSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
DECLARE_OPERATOR_X(HiveTableSinkLocalState)
DECLARE_OPERATOR_X(IcebergTableSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(SpillSortSinkLocalState)
DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
DECLARE_OPERATOR_X(AggSinkLocalState)
DECLARE_OPERATOR_X(PartitionedAggSinkLocalState)
DECLARE_OPERATOR_X(ExchangeSinkLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(UnionSinkLocalState)
DECLARE_OPERATOR_X(MultiCastDataStreamSinkLocalState)
DECLARE_OPERATOR_X(PartitionSortSinkLocalState)
DECLARE_OPERATOR_X(SetProbeSinkLocalState<true>)
DECLARE_OPERATOR_X(SetProbeSinkLocalState<false>)
DECLARE_OPERATOR_X(SetSinkLocalState<true>)
DECLARE_OPERATOR_X(SetSinkLocalState<false>)
DECLARE_OPERATOR_X(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR_X(GroupCommitBlockSinkLocalState)

#undef DECLARE_OPERATOR_X

#define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinProbeLocalState)
DECLARE_OPERATOR_X(OlapScanLocalState)
DECLARE_OPERATOR_X(GroupCommitLocalState)
DECLARE_OPERATOR_X(JDBCScanLocalState)
DECLARE_OPERATOR_X(FileScanLocalState)
DECLARE_OPERATOR_X(EsScanLocalState)
DECLARE_OPERATOR_X(AnalyticLocalState)
DECLARE_OPERATOR_X(SortLocalState)
DECLARE_OPERATOR_X(SpillSortLocalState)
DECLARE_OPERATOR_X(AggLocalState)
DECLARE_OPERATOR_X(PartitionedAggLocalState)
DECLARE_OPERATOR_X(TableFunctionLocalState)
DECLARE_OPERATOR_X(ExchangeLocalState)
DECLARE_OPERATOR_X(RepeatLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR_X(AssertNumRowsLocalState)
DECLARE_OPERATOR_X(EmptySetLocalState)
DECLARE_OPERATOR_X(UnionSourceLocalState)
DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState)
DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
DECLARE_OPERATOR_X(SetSourceLocalState<true>)
DECLARE_OPERATOR_X(SetSourceLocalState<false>)
DECLARE_OPERATOR_X(DataGenLocalState)
DECLARE_OPERATOR_X(SchemaScanLocalState)
DECLARE_OPERATOR_X(MetaScanLocalState)
DECLARE_OPERATOR_X(LocalExchangeSourceLocalState)
DECLARE_OPERATOR_X(PartitionedHashJoinProbeLocalState)

#undef DECLARE_OPERATOR_X
#define DECLARE_OPERATOR(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR(ResultSinkLocalState)
DECLARE_OPERATOR(JdbcTableSinkLocalState)
DECLARE_OPERATOR(MemoryScratchSinkLocalState)
DECLARE_OPERATOR(ResultFileSinkLocalState)
DECLARE_OPERATOR(OlapTableSinkLocalState)
DECLARE_OPERATOR(OlapTableSinkV2LocalState)
DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
DECLARE_OPERATOR(SortSinkLocalState)
DECLARE_OPERATOR(SpillSortSinkLocalState)
DECLARE_OPERATOR(LocalExchangeSinkLocalState)
DECLARE_OPERATOR(AggSinkLocalState)
DECLARE_OPERATOR(PartitionedAggSinkLocalState)
DECLARE_OPERATOR(ExchangeSinkLocalState)
DECLARE_OPERATOR(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR(UnionSinkLocalState)
DECLARE_OPERATOR(MultiCastDataStreamSinkLocalState)
DECLARE_OPERATOR(PartitionSortSinkLocalState)
DECLARE_OPERATOR(SetProbeSinkLocalState<true>)
DECLARE_OPERATOR(SetProbeSinkLocalState<false>)
DECLARE_OPERATOR(SetSinkLocalState<true>)
DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)

#undef DECLARE_OPERATOR

#define DECLARE_OPERATOR(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
DECLARE_OPERATOR(HashJoinProbeLocalState)
DECLARE_OPERATOR(OlapScanLocalState)
DECLARE_OPERATOR(GroupCommitLocalState)
DECLARE_OPERATOR(JDBCScanLocalState)
DECLARE_OPERATOR(FileScanLocalState)
DECLARE_OPERATOR(EsScanLocalState)
DECLARE_OPERATOR(AnalyticLocalState)
DECLARE_OPERATOR(SortLocalState)
DECLARE_OPERATOR(SpillSortLocalState)
DECLARE_OPERATOR(AggLocalState)
DECLARE_OPERATOR(PartitionedAggLocalState)
DECLARE_OPERATOR(TableFunctionLocalState)
DECLARE_OPERATOR(ExchangeLocalState)
DECLARE_OPERATOR(RepeatLocalState)
DECLARE_OPERATOR(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR(AssertNumRowsLocalState)
DECLARE_OPERATOR(EmptySetLocalState)
DECLARE_OPERATOR(UnionSourceLocalState)
DECLARE_OPERATOR(MultiCastDataStreamSourceLocalState)
DECLARE_OPERATOR(PartitionSortSourceLocalState)
DECLARE_OPERATOR(SetSourceLocalState<true>)
DECLARE_OPERATOR(SetSourceLocalState<false>)
DECLARE_OPERATOR(DataGenLocalState)
DECLARE_OPERATOR(SchemaScanLocalState)
DECLARE_OPERATOR(MetaScanLocalState)
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)

#undef DECLARE_OPERATOR

template class StreamingOperatorX<AssertNumRowsLocalState>;
template class StreamingOperatorX<SelectLocalState>;
Expand Down
Loading
Loading