Skip to content

Commit

Permalink
[pick](branch-3.0) pick #39982 #40576 #41555 #41592 #41676 #41740 (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 15, 2024
1 parent d80031f commit 54b4fa3
Show file tree
Hide file tree
Showing 93 changed files with 442 additions and 642 deletions.
4 changes: 3 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,9 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t

if (config::is_cloud_mode()) {
auto st = config::set_config("enable_file_cache", "true", true, true);
LOG(INFO) << "set config enable_file_cache " << "true" << " " << st;
LOG(INFO) << "set config enable_file_cache "
<< "true"
<< " " << st;
}

return true;
Expand Down
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,12 +769,13 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status AggSinkOperatorX::prepare(RuntimeState* state) {
Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc()));
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -789,7 +790,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc(),
state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}
Expand Down Expand Up @@ -824,10 +825,6 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output(
_probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor));
}
return Status::OK();
}

Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (auto& _aggregate_evaluator : _aggregate_evaluators) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,18 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child_x
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,14 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
}
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_buffered_tuple_id);
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
if (!_partition_by_eq_expr_ctxs.empty()) {
Expand All @@ -249,10 +250,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc));
}
}
return Status::OK();
}

Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
13 changes: 4 additions & 9 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,15 @@ Status AnalyticLocalState::close(RuntimeState* state) {
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
for (size_t i = 0; i < _agg_functions.size(); ++i) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_agg_functions[i]->set_version(state->be_exec_version());
_change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
Expand All @@ -597,11 +597,6 @@ Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}
return Status::OK();
}

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
for (auto* agg_function : _agg_functions) {
RETURN_IF_ERROR(agg_function->open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
bool is_source() const override { return true; }

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
Status DataGenSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/datagen_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
const DescriptorTbl& descs);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

[[nodiscard]] bool is_source() const override { return true; }
Expand Down
14 changes: 4 additions & 10 deletions be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState*
return Status::OK();
}

Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

Expand All @@ -412,12 +412,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}

return Status::OK();
}

Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ class DistinctStreamingAggOperatorX final
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
bool need_more_input_data(RuntimeState* state) const override;

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/es_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status EsScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
Status EsScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));

_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/es_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
const DescriptorTbl& descs, int parallel_tasks);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
friend class EsScanLocalState;
Expand Down
Loading

0 comments on commit 54b4fa3

Please sign in to comment.