Skip to content

Commit

Permalink
[refactor](pipeline) Refactor pipeline execution (apache#39982)
Browse files Browse the repository at this point in the history
Unify `prepare` and `open` in `Operator`.
  • Loading branch information
Gabriel39 committed Oct 11, 2024
1 parent a1f8383 commit e3fa6c0
Show file tree
Hide file tree
Showing 89 changed files with 205 additions and 441 deletions.
7 changes: 2 additions & 5 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status AggSinkOperatorX::prepare(RuntimeState* state) {
Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
Expand Down Expand Up @@ -824,10 +825,6 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output(
_probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor));
}
return Status::OK();
}

Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (auto& _aggregate_evaluator : _aggregate_evaluators) {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand Down
7 changes: 2 additions & 5 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
}
Expand All @@ -249,10 +250,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc));
}
}
return Status::OK();
}

Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand Down
9 changes: 2 additions & 7 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ Status AnalyticLocalState::close(RuntimeState* state) {
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
Expand Down Expand Up @@ -597,11 +597,6 @@ Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}
return Status::OK();
}

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
for (auto* agg_function : _agg_functions) {
RETURN_IF_ERROR(agg_function->open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
bool is_source() const override { return true; }

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
Status DataGenSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/datagen_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
const DescriptorTbl& descs);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

[[nodiscard]] bool is_source() const override { return true; }
Expand Down
10 changes: 2 additions & 8 deletions be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState*
return Status::OK();
}

Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
Expand Down Expand Up @@ -412,12 +412,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}

return Status::OK();
}

Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class DistinctStreamingAggOperatorX final
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/es_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status EsScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
Status EsScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));

_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/es_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
const DescriptorTbl& descs, int parallel_tasks);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
friend class EsScanLocalState;
Expand Down
8 changes: 2 additions & 6 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
_mem_tracker = std::make_unique<MemTracker>("ExchangeSinkOperatorX:");
return Status::OK();
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
DCHECK(state != nullptr);
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt

RuntimeState* state() { return _state; }

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand Down
10 changes: 2 additions & 8 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,13 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state
return Status::OK();
}

Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
Status ExchangeSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
DCHECK_GT(_num_senders, 0);

if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor));
}

return Status::OK();
}

Status ExchangeSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int num_senders);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState* state) {
return Status::OK();
}

Status FileScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
Status FileScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::open(state));
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) {
TFileScanRangeParams& params =
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

bool is_file_scan_operator() const override { return true; }

Expand Down
10 changes: 3 additions & 7 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,15 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
return Status::OK();
}

Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id;
return Status::InternalError("unknown destination tuple descriptor");
}
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}

Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class GroupCommitBlockSinkOperatorX final

Status init(const TDataSink& sink) override;

Status prepare(RuntimeState* state) override;

Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override;
Expand Down
21 changes: 9 additions & 12 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,18 +429,6 @@ HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope
: std::vector<TExpr> {}),
_need_local_merge(need_local_merge) {}

Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
if (_is_broadcast_join) {
if (state->enable_share_hash_table_for_broadcast_join()) {
_shared_hashtable_controller =
state->get_query_ctx()->get_shared_hash_table_controller();
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc()));
return Status::OK();
}

Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);
Expand Down Expand Up @@ -498,6 +486,15 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
}

Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::open(state));
if (_is_broadcast_join) {
if (state->enable_share_hash_table_for_broadcast_join()) {
_shared_hashtable_controller =
state->get_query_ctx()->get_shared_hash_table_controller();
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class HashJoinBuildSinkOperatorX final

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand Down
10 changes: 2 additions & 8 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,8 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
Status HashJoinProbeOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::open(state));
// init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need
// insert to output block of hash join.
// _left_output_slots_flags : column of left table need to output set flag = true
Expand All @@ -600,7 +600,6 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
init_output_slots_flags(_child_x->row_desc().tuple_descriptors(), _left_output_slot_flags);
init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
_right_output_slot_flags);
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc));
// _other_join_conjuncts are evaluated in the context of the rows produced by this node
for (auto& conjunct : _other_join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
Expand Down Expand Up @@ -667,11 +666,6 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
}

_build_side_child.reset();
return Status::OK();
}

Status HashJoinProbeOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
for (auto& conjunct : _other_join_conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/exec/hive_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,9 @@ class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocal
return Status::OK();
}

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}

Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/exec/iceberg_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,9 @@ class IcebergTableSinkOperatorX final : public DataSinkOperatorX<IcebergTableSin
return Status::OK();
}

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(Base::prepare(state));
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}

Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Expand Down
9 changes: 1 addition & 8 deletions be/src/pipeline/exec/jdbc_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,9 @@ Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) {
return Status::OK();
}

Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return Status::OK();
}

Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
return Status::OK();
}
Expand Down
Loading

0 comments on commit e3fa6c0

Please sign in to comment.