Skip to content

Commit

Permalink
[Feature] support Group execution
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Mar 13, 2024
1 parent 82aa66e commit 6c31bd8
Show file tree
Hide file tree
Showing 86 changed files with 1,974 additions and 494 deletions.
3 changes: 2 additions & 1 deletion be/src/connector/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Status DataSource::parse_runtime_filters(RuntimeState* state) {
if (_runtime_filters == nullptr || _runtime_filters->size() == 0) return Status::OK();
for (const auto& item : _runtime_filters->descriptors()) {
RuntimeFilterProbeDescriptor* probe = item.second;
const JoinRuntimeFilter* filter = probe->runtime_filter();
DCHECK(runtime_bloom_filter_eval_context.driver_sequence == -1);
const JoinRuntimeFilter* filter = probe->runtime_filter(runtime_bloom_filter_eval_context.driver_sequence);
if (filter == nullptr) continue;
SlotId slot_id;
if (!probe->is_probe_slot_ref(&slot_id)) continue;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ set(EXEC_FILES
pipeline/pipeline.cpp
pipeline/spill_process_operator.cpp
pipeline/spill_process_channel.cpp
pipeline/group_execution/execution_group.cpp
pipeline/group_execution/execution_group_builder.cpp
pipeline/group_execution/group_operator.cpp
workgroup/work_group.cpp
workgroup/scan_executor.cpp
workgroup/scan_task_queue.cpp
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/aggregate/aggregate_streaming_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ pipeline::OpFactories AggregateStreamingNode::decompose_to_pipeline(pipeline::Pi
size_t degree_of_parallelism = context->source_operator(ops_with_sink)->degree_of_parallelism();

auto should_cache = context->should_interpolate_cache_operator(id(), ops_with_sink[0]);
if (!should_cache && _tnode.agg_node.__isset.interpolate_passthrough && _tnode.agg_node.interpolate_passthrough &&
context->could_local_shuffle(ops_with_sink)) {
bool could_local_shuffle = !should_cache && !context->enable_group_execution();
if (could_local_shuffle && _tnode.agg_node.__isset.interpolate_passthrough &&
_tnode.agg_node.interpolate_passthrough && context->could_local_shuffle(ops_with_sink)) {
ops_with_sink = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), ops_with_sink,
degree_of_parallelism, true);
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ int ConnectorScanNode::_estimate_max_concurrent_chunks() const {
}

pipeline::OpFactories ConnectorScanNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
auto exec_group = context->find_exec_group_by_plan_node_id(_id);
context->set_current_execution_group(exec_group);

size_t dop = context->dop_of_source_operator(id());
std::shared_ptr<pipeline::ConnectorScanOperatorFactory> scan_op = nullptr;
bool stream_data_source = _data_source_provider->stream_data_source();
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/cross_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,10 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> CrossJoinNode::_decompos
std::move(_conjunct_ctxs), std::move(cross_join_context), _join_op);
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(left_factory.get(), context, rc_rf_probe_collector);
left_ops = context->maybe_interpolate_local_adpative_passthrough_exchange(runtime_state(), id(), left_ops,
context->degree_of_parallelism());
if (context->is_colocate_group()) {
left_ops = context->maybe_interpolate_local_adpative_passthrough_exchange(runtime_state(), id(), left_ops,
context->degree_of_parallelism());
}
left_ops.emplace_back(std::move(left_factory));

if (limit() != -1) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ void ExchangeNode::debug_string(int indentation_level, std::stringstream* out) c

pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;

auto exec_group = context->find_exec_group_by_plan_node_id(_id);
context->set_current_execution_group(exec_group);
OpFactories operators;
if (!_is_merging) {
auto* query_ctx = context->runtime_state()->query_ctx();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ Status ExecNode::prepare(RuntimeState* state) {
_mem_tracker.reset(new MemTracker(_runtime_profile.get(), std::make_tuple(true, false, false), "", -1,
_runtime_profile->name(), nullptr));
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state));
RETURN_IF_ERROR(_runtime_filter_collector.prepare(state, row_desc(), _runtime_profile.get()));
RETURN_IF_ERROR(_runtime_filter_collector.prepare(state, _runtime_profile.get()));

// TODO(zc):
// AddExprCtxsToFree(_conjunct_ctxs);
Expand Down
77 changes: 48 additions & 29 deletions be/src/exec/hash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "exec/hash_joiner.h"
#include "exec/pipeline/chunk_accumulate_operator.h"
#include "exec/pipeline/exchange/exchange_source_operator.h"
#include "exec/pipeline/group_execution/execution_group_builder.h"
#include "exec/pipeline/group_execution/execution_group_fwd.h"
#include "exec/pipeline/hashjoin/hash_join_build_operator.h"
#include "exec/pipeline/hashjoin/hash_join_probe_operator.h"
#include "exec/pipeline/hashjoin/hash_joiner_factory.h"
Expand All @@ -38,6 +40,8 @@
#include "exprs/expr.h"
#include "exprs/in_const_predicate.hpp"
#include "exprs/runtime_filter_bank.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/RuntimeFilter_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/current_thread.h"
#include "runtime/runtime_filter_worker.h"
Expand Down Expand Up @@ -425,30 +429,27 @@ void HashJoinNode::close(RuntimeState* state) {
template <class HashJoinerFactory, class HashJoinBuilderFactory, class HashJoinProbeFactory>
pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
using namespace pipeline;

auto rhs_operators = child(1)->decompose_to_pipeline(context);
// "col NOT IN (NULL, val1, val2)" always returns false, so hash join should
// return empty result in this case. Hash join cannot be divided into multiple
// partitions in this case. Otherwise, NULL value in right table will only occur
// in some partition hash table, and other partition hash table can output chunk.
// TODO: support nullaware left anti join with shuffle join
DCHECK(_join_type != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _distribution_mode == TJoinDistributionMode::BROADCAST);
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
// Broadcast join need only create one hash table, because all the HashJoinProbeOperators
// use the same hash table with their own different probe states.
rhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), rhs_operators);
} else {
// "col NOT IN (NULL, val1, val2)" always returns false, so hash join should
// return empty result in this case. Hash join cannot be divided into multiple
// partitions in this case. Otherwise, NULL value in right table will only occur
// in some partition hash table, and other partition hash table can output chunk.
if (_join_type == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
rhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), rhs_operators);
} else {
// Both HashJoin{Build, Probe}Operator are parallelized
// There are two ways of shuffle
// 1. If previous op is ExchangeSourceOperator and its partition type is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED
// then pipeline level shuffle will be performed at sender side (ExchangeSinkOperator), so
// there is no need to perform local shuffle again at receiver side
// 2. Otherwise, add LocalExchangeOperator
// to shuffle multi-stream into #degree_of_parallelism# streams each of that pipes into HashJoin{Build, Probe}Operator.
rhs_operators = context->maybe_interpolate_local_shuffle_exchange(runtime_state(), id(), rhs_operators,
_build_equivalence_partition_expr_ctxs);
}
// Both HashJoin{Build, Probe}Operator are parallelized
// There are two ways of shuffle
// 1. If previous op is ExchangeSourceOperator and its partition type is HASH_PARTITIONED or BUCKET_SHUFFLE_HASH_PARTITIONED
// then pipeline level shuffle will be performed at sender side (ExchangeSinkOperator), so
// there is no need to perform local shuffle again at receiver side
// 2. Otherwise, add LocalExchangeOperator
// to shuffle multi-stream into #degree_of_parallelism# streams each of that pipes into HashJoin{Build, Probe}Operator.
rhs_operators = context->maybe_interpolate_local_shuffle_exchange(runtime_state(), id(), rhs_operators,
_build_equivalence_partition_expr_ctxs);
}

size_t num_right_partitions = context->source_operator(rhs_operators)->degree_of_parallelism();
Expand All @@ -468,12 +469,8 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
_build_runtime_filters, _output_slots, _output_slots, _distribution_mode, false);
auto hash_joiner_factory = std::make_shared<starrocks::pipeline::HashJoinerFactory>(param);

// add placeholder into RuntimeFilterHub, HashJoinBuildOperator will generate runtime filters and fill it,
// Operators consuming the runtime filters will inspect this placeholder.
context->fragment_context()->runtime_filter_hub()->add_holder(_id);

// Create a shared RefCountedRuntimeFilterCollector
auto&& rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
auto rc_rf_probe_collector = std::make_shared<RcRfProbeCollector>(2, std::move(this->runtime_filter_collector()));
// In default query engine, we only build one hash table for join right child.
// But for pipeline query engine, we will build `num_right_partitions` hash tables, so we need to enlarge the limit

Expand Down Expand Up @@ -508,12 +505,20 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
DeferOp pop_dependent_pipeline([context]() { context->pop_dependent_pipeline(); });

auto lhs_operators = child(0)->decompose_to_pipeline(context);
if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
context->degree_of_parallelism());
auto join_colocate_group = context->find_exec_group_by_plan_node_id(_id);
if (join_colocate_group->type() == ExecutionGroupType::COLOCATE) {
DCHECK(context->current_execution_group()->is_colocate_exec_group());
DCHECK_EQ(context->current_execution_group(), join_colocate_group);
context->set_current_execution_group(join_colocate_group);
} else {
if (_join_type == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators);
// left child is colocate group, but current join is not colocate group
if (context->current_execution_group()->is_colocate_exec_group()) {
lhs_operators = context->interpolate_grouped_exchange(_id, lhs_operators);
}

if (_distribution_mode == TJoinDistributionMode::BROADCAST) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
context->degree_of_parallelism());
} else {
auto* rhs_source_op = context->source_operator(rhs_operators);
auto* lhs_source_op = context->source_operator(lhs_operators);
Expand All @@ -522,13 +527,27 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
_probe_equivalence_partition_expr_ctxs);
}
}

lhs_operators.emplace_back(std::move(probe_op));
// add placeholder into RuntimeFilterHub, HashJoinBuildOperator will generate runtime filters and fill it,
// Operators consuming the runtime filters will inspect this placeholder.
if (context->is_colocate_group() && _distribution_mode == TJoinDistributionMode::COLOCATE) {
for (auto runtime_filter_build_desc : _build_runtime_filters) {
// local colocate won't generate global runtime filter
DCHECK(!runtime_filter_build_desc->has_remote_targets());
runtime_filter_build_desc->set_num_colocate_partition(num_right_partitions);
}
context->fragment_context()->runtime_filter_hub()->add_holder(_id, num_right_partitions);
} else {
context->fragment_context()->runtime_filter_hub()->add_holder(_id);
}

if (limit() != -1) {
lhs_operators.emplace_back(std::make_shared<LimitOperatorFactory>(context->next_operator_id(), id(), limit()));
}

if (_hash_join_node.__isset.interpolate_passthrough && _hash_join_node.interpolate_passthrough) {
if (_hash_join_node.__isset.interpolate_passthrough && _hash_join_node.interpolate_passthrough &&
!context->is_colocate_group()) {
lhs_operators = context->maybe_interpolate_local_passthrough_exchange(runtime_state(), id(), lhs_operators,
context->degree_of_parallelism(), true);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ Status HashJoiner::_create_runtime_bloom_filters(RuntimeState* state, int64_t li
bool eq_null = _is_null_safes[expr_order];
MutableJoinRuntimeFilterPtr filter = nullptr;
auto multi_partitioned = rf_desc->layout().pipeline_level_multi_partitioned();
multi_partitioned |= rf_desc->num_colocate_partition() > 0;
if (multi_partitioned) {
LogicalType build_type = rf_desc->build_expr_type();
filter = std::shared_ptr<JoinRuntimeFilter>(
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,9 @@ pipeline::OpFactories OlapScanNode::decompose_to_pipeline(pipeline::PipelineBuil
scan_prepare_op->set_degree_of_parallelism(shared_morsel_queue ? 1 : dop);
this->init_runtime_filter_for_operator(scan_prepare_op.get(), context, rc_rf_probe_collector);

auto exec_group = context->find_exec_group_by_plan_node_id(_id);
context->set_current_execution_group(exec_group);

auto scan_prepare_pipeline = pipeline::OpFactories{
std::move(scan_prepare_op),
std::make_shared<pipeline::NoopSinkOperatorFactory>(context->next_operator_id(), id()),
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void OlapScanConjunctsManager::normalize_join_runtime_filter(const SlotDescripto
// bloom runtime filter
for (const auto& it : runtime_filters->descriptors()) {
const RuntimeFilterProbeDescriptor* desc = it.second;
const JoinRuntimeFilter* rf = desc->runtime_filter();
const JoinRuntimeFilter* rf = desc->runtime_filter(driver_sequence);
using RangeType = ColumnValueRange<RangeValueType>;
using ValueType = typename RunTimeTypeTraits<SlotType>::CppType;
SlotId slot_id;
Expand All @@ -397,7 +397,7 @@ void OlapScanConjunctsManager::normalize_join_runtime_filter(const SlotDescripto

// runtime filter existed and does not have null.
if (rf == nullptr) {
rt_ranger_params.add_unarrived_rf(desc, &slot);
rt_ranger_params.add_unarrived_rf(desc, &slot, driver_sequence);
continue;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/olap_scan_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class OlapScanConjunctsManager {
const std::vector<std::string>* key_column_names;
const RuntimeFilterProbeCollector* runtime_filters;
RuntimeState* runtime_state;
int32_t driver_sequence = -1;

private:
// fields generated by parsing conjunct ctxs.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ SinkBuffer::SinkBuffer(FragmentContext* fragment_ctx, const std::vector<TPlanFra
_is_dest_merge(is_dest_merge),
_rpc_http_min_size(fragment_ctx->runtime_state()->get_rpc_http_min_size()),
_sent_audit_stats_frequency_upper_limit(
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->num_drivers() * 4))) {
std::max((int64_t)64, BitUtil::RoundUpToPowerOfTwo(fragment_ctx->total_dop() * 4))) {
for (const auto& dest : destinations) {
const auto& instance_id = dest.fragment_instance_id;
// instance_id.lo == -1 indicates that the destination is pseudo for bucket shuffle join.
Expand Down
Loading

0 comments on commit 6c31bd8

Please sign in to comment.