Skip to content

Commit

Permalink
Merge branch 'master' into improve-column-ut
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Dec 3, 2024
2 parents 09b72a0 + 0d48863 commit 4cb96b1
Show file tree
Hide file tree
Showing 135 changed files with 2,456 additions and 395 deletions.
20 changes: 4 additions & 16 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,10 @@ class RuntimePredicateWrapper {

Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly) {
int node_id, std::shared_ptr<IRuntimeFilter>* res) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id);
}

RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
Expand Down Expand Up @@ -1348,7 +1347,7 @@ std::string IRuntimeFilter::formatted_state() const {
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));

Expand All @@ -1370,21 +1369,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size
? options->runtime_bloom_filter_max_size
: 0;
auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size;
// We build runtime filter by exact distinct count if all of 3 conditions are met:
// 1. Only 1 join key
// 2. Bloom filter
// 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join).
params.build_bf_exactly =
build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);

params.build_bf_exactly = desc->__isset.build_bf_exactly && desc->build_bf_exactly;
params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv;

if (!sync_filter_size) {
params.build_bf_exactly &= !_is_broadcast_join;
}

if (desc->__isset.bloom_filter_size_bytes) {
params.bloom_filter_size = desc->bloom_filter_size_bytes;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ class IRuntimeFilter {

static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly = false);
int node_id, std::shared_ptr<IRuntimeFilter>* res);

RuntimeFilterContextSPtr& get_shared_context_ref();

Expand Down Expand Up @@ -260,7 +259,7 @@ class IRuntimeFilter {

// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);
int node_id = -1);

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
RETURN_IF_ERROR(_hash_table_init(state));
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1));
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
&_runtime_filters[i]));
}

_runtime_filter_slots =
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
&_runtime_filters[i], false));
&_runtime_filters[i]));
}
return Status::OK();
}
Expand Down
Loading

0 comments on commit 4cb96b1

Please sign in to comment.