Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support push down min/max to scanner for runtime filter with null #53857

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 143 additions & 14 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
#include <variant>

#include "column/type_traits.h"
#include "exprs/binary_predicate.h"
#include "exprs/compound_predicate.h"
#include "exprs/dictmapping_expr.h"
#include "exprs/expr_context.h"
#include "exprs/in_const_predicate.hpp"
#include "exprs/is_null_predicate.h"
#include "gutil/map_util.h"
#include "runtime/descriptors.h"
#include "storage/column_predicate.h"
Expand Down Expand Up @@ -540,14 +543,51 @@ Status ChunkPredicateBuilder<E, Type>::normalize_binary_predicate(const SlotDesc
return Status::OK();
}

template <BoxedExprType E, CompoundNodeType Type>
template <LogicalType SlotType, LogicalType MappingType, template <class> class Decoder, class... Args>
void ChunkPredicateBuilder<E, Type>::normalized_rf_with_null(const JoinRuntimeFilter* rf, Expr* col_ref,
Args&&... args) {
DCHECK(Type == CompoundNodeType::AND);

ObjectPool* pool = _opts.obj_pool;

const auto* filter = down_cast<const RuntimeBloomFilter<MappingType>*>(rf);
using DecoderType = Decoder<typename RunTimeTypeTraits<MappingType>::CppType>;
DecoderType decoder(std::forward<Args>(args)...);
detail::RuntimeColumnPredicateBuilder::MinMaxParser<RuntimeBloomFilter<MappingType>, DecoderType> parser(filter,
&decoder);
const TypeDescriptor& col_type = col_ref->type();

ColumnPtr const_min_col = parser.template min_const_column<SlotType>(col_type);
ColumnPtr const_max_col = parser.template max_const_column<SlotType>(col_type);
VectorizedLiteral* min_literal = pool->add(new VectorizedLiteral(std::move(const_min_col), col_type));
VectorizedLiteral* max_literal = pool->add(new VectorizedLiteral(std::move(const_max_col), col_type));

Expr* left_expr = _gen_min_binary_pred(col_ref, min_literal, filter->left_close_interval());
Expr* right_expr = _gen_max_binary_pred(col_ref, max_literal, filter->right_close_interval());
Expr* is_null_expr = _gen_is_null_pred(col_ref);
Expr* and_expr = _gen_and_pred(left_expr, right_expr);

std::vector<BoxedExpr> containers;
containers.emplace_back(and_expr);
containers.emplace_back(is_null_expr);
ChunkPredicateBuilder<BoxedExpr, CompoundNodeType::OR> child_builder(_opts, containers, false);

auto normalized = child_builder.parse_conjuncts();
if (!normalized.ok()) {
} else if (normalized.value()) {
_child_builders.emplace_back(child_builder);
}
}

template <BoxedExprType E, CompoundNodeType Type>
template <LogicalType SlotType, typename RangeValueType, bool Negative>
Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotDescriptor& slot,
ColumnValueRange<RangeValueType>* range) {
// TODO(lzh): OR preidcate with runtime filters is not supported yet.
if constexpr (Negative) {
if (!_is_root_builder) {
return Status::OK();
}
DCHECK(!Negative);

// in runtime filter
for (size_t i = 0; i < _exprs.size(); i++) {
Expand Down Expand Up @@ -588,7 +628,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD

// bloom runtime filter
for (const auto& it : _opts.runtime_filters->descriptors()) {
const RuntimeFilterProbeDescriptor* desc = it.second;
RuntimeFilterProbeDescriptor* desc = it.second;
const JoinRuntimeFilter* rf = desc->runtime_filter(_opts.driver_sequence);
using RangeType = ColumnValueRange<RangeValueType>;
using ValueType = typename RunTimeTypeTraits<SlotType>::CppType;
Expand All @@ -603,8 +643,6 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
continue;
}

if (rf->has_null()) continue;

// If this column doesn't have other filter, we use join runtime filter
// to fast comput row range in storage engine
if (range->is_init_state()) {
Expand All @@ -618,18 +656,35 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
auto& global_dicts = _opts.runtime_state->get_query_global_dict_map();
if constexpr (SlotType == TYPE_VARCHAR) {
if (auto iter = global_dicts.find(slot_id); iter != global_dicts.end()) {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, ValueType, LowCardDictType,
detail::RuntimeColumnPredicateBuilder::GlobalDictCodeDecoder>(*range, rf, &iter->second.first);
if (rf->has_null()) {
normalized_rf_with_null<SlotType, LowCardDictType,
detail::RuntimeColumnPredicateBuilder::GlobalDictCodeDecoder>(
rf, desc->probe_expr_ctx()->root(), &iter->second.first);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, LowCardDictType,
detail::RuntimeColumnPredicateBuilder::GlobalDictCodeDecoder>(*range, rf,
&iter->second.first);
}
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, ValueType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(*range, rf,
nullptr);
if (rf->has_null()) {
normalized_rf_with_null<SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
rf, desc->probe_expr_ctx()->root(), nullptr);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
*range, rf, nullptr);
}
}
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, ValueType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(*range, rf,
nullptr);
if (rf->has_null()) {
normalized_rf_with_null<SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
rf, desc->probe_expr_ctx()->root(), nullptr);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(*range, rf,
nullptr);
}
}
}

Expand Down Expand Up @@ -976,6 +1031,79 @@ Status ChunkPredicateBuilder<E, Type>::build_column_expr_predicates() {
return Status::OK();
}

template <BoxedExprType E, CompoundNodeType Type>
Expr* ChunkPredicateBuilder<E, Type>::_gen_min_binary_pred(Expr* col_ref, VectorizedLiteral* min_literal,
bool is_close_interval) {
TExprNode node;
node.node_type = TExprNodeType::BINARY_PRED;
node.type = TypeDescriptor(TYPE_BOOLEAN).to_thrift();
node.child_type = to_thrift(col_ref->type().type);
if (is_close_interval) {
node.__set_opcode(TExprOpcode::GE);
} else {
node.__set_opcode(TExprOpcode::GT);
}

Expr* expr = _opts.obj_pool->add(VectorizedBinaryPredicateFactory::from_thrift(node));
expr->add_child(col_ref);
expr->add_child(min_literal);
return expr;
}

template <BoxedExprType E, CompoundNodeType Type>
Expr* ChunkPredicateBuilder<E, Type>::_gen_max_binary_pred(Expr* col_ref, VectorizedLiteral* max_literal,
bool is_close_interval) {
TExprNode node;
node.node_type = TExprNodeType::BINARY_PRED;
node.type = TypeDescriptor(TYPE_BOOLEAN).to_thrift();
node.child_type = to_thrift(col_ref->type().type);
if (is_close_interval) {
node.__set_opcode(TExprOpcode::LE);
} else {
node.__set_opcode(TExprOpcode::LT);
}

Expr* expr = _opts.obj_pool->add(VectorizedBinaryPredicateFactory::from_thrift(node));
expr->add_child(col_ref);
expr->add_child(max_literal);
return expr;
}

template <BoxedExprType E, CompoundNodeType Type>
Expr* ChunkPredicateBuilder<E, Type>::_gen_is_null_pred(Expr* col_ref) {
TExprNode null_pred_node;
null_pred_node.node_type = TExprNodeType::FUNCTION_CALL;
TFunction fn;
fn.name.function_name = "is_null_pred";
null_pred_node.__set_fn(fn);
TTypeNode type_node;
type_node.type = TTypeNodeType::SCALAR;
TScalarType scalar_type;
scalar_type.__set_type(TPrimitiveType::BOOLEAN);
type_node.__set_scalar_type(scalar_type);
null_pred_node.type.types.emplace_back(type_node);

Expr* expr = _opts.obj_pool->add(VectorizedIsNullPredicateFactory::from_thrift(null_pred_node));
expr->add_child(col_ref);
return expr;
}

template <BoxedExprType E, CompoundNodeType Type>
Expr* ChunkPredicateBuilder<E, Type>::_gen_and_pred(Expr* left, Expr* right) {
TExprNode and_pred_node;
and_pred_node.node_type = TExprNodeType::COMPOUND_PRED;
and_pred_node.num_children = 2;
and_pred_node.is_nullable = true;
and_pred_node.__set_opcode(TExprOpcode::COMPOUND_AND);
and_pred_node.__set_child_type(TPrimitiveType::BOOLEAN);
and_pred_node.__set_type(TypeDescriptor(TYPE_BOOLEAN).to_thrift());

Expr* expr = _opts.obj_pool->add(VectorizedCompoundPredicateFactory::from_thrift(and_pred_node));
expr->add_child(left);
expr->add_child(right);
return expr;
}

// ------------------------------------------------------------------------------------
// OlapScanConjunctsManager
// ------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1030,4 +1158,5 @@ const UnarrivedRuntimeFilterList& ScanConjunctsManager::unarrived_runtime_filter
return _root_builder.unarrived_runtime_filters();
}

template class ChunkPredicateBuilder<BoxedExprContext, CompoundNodeType::AND>;
} // namespace starrocks
9 changes: 9 additions & 0 deletions be/src/exec/olap_scan_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class RuntimeState;
class RuntimeFilterProbeCollector;
class PredicateParser;
class ColumnPredicate;
class VectorizedLiteral;
using ColumnPredicatePtr = std::unique_ptr<ColumnPredicate>;
using ColumnPredicatePtrs = std::vector<ColumnPredicatePtr>;

Expand Down Expand Up @@ -89,6 +90,9 @@ class ChunkPredicateBuilder {

const UnarrivedRuntimeFilterList& unarrived_runtime_filters() { return rt_ranger_params; }

template <LogicalType SlotType, LogicalType MappingType, template <class> class Decoder, class... Args>
void normalized_rf_with_null(const JoinRuntimeFilter* rf, Expr* col_ref, Args&&... args);

private:
const ScanConjunctsManagerOptions& _opts;
const std::vector<E> _exprs;
Expand Down Expand Up @@ -155,6 +159,11 @@ class ChunkPredicateBuilder {
// `ColumnExprPredicate` would be used in late materialization, zone map filtering,
// dict encoded column filtering and bitmap value column filtering etc.
Status build_column_expr_predicates();

Expr* _gen_min_binary_pred(Expr* col_ref, VectorizedLiteral* min_literal, bool is_close_interval);
Expr* _gen_max_binary_pred(Expr* col_ref, VectorizedLiteral* max_literal, bool is_close_interval);
Expr* _gen_is_null_pred(Expr* col_ref);
Expr* _gen_and_pred(Expr* left, Expr* right);
};

class ScanConjunctsManager {
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ struct TypeDescriptor {
void to_protobuf(PTypeDesc* proto_type) const;
};

static const TypeDescriptor TYPE_INT_DESC = TypeDescriptor(LogicalType::TYPE_INT);

inline std::ostream& operator<<(std::ostream& os, const TypeDescriptor& type) {
os << type.debug_string();
return os;
Expand Down
37 changes: 31 additions & 6 deletions be/src/storage/olap_runtime_range_pruner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ struct RuntimeColumnPredicateBuilder {
if constexpr (ltype == TYPE_VARCHAR) {
auto cid = parser->column_id(*slot);
if (auto iter = global_dictmaps->find(cid); iter != global_dictmaps->end()) {
build_minmax_range<RangeType, value_type, LowCardDictType, GlobalDictCodeDecoder>(range, rf,
build_minmax_range<RangeType, limit_type, LowCardDictType, GlobalDictCodeDecoder>(range, rf,
iter->second);
} else {
build_minmax_range<RangeType, value_type, mapping_type, DummyDecoder>(range, rf, nullptr);
build_minmax_range<RangeType, limit_type, mapping_type, DummyDecoder>(range, rf, nullptr);
}
} else {
build_minmax_range<RangeType, value_type, mapping_type, DummyDecoder>(range, rf, nullptr);
build_minmax_range<RangeType, limit_type, mapping_type, DummyDecoder>(range, rf, nullptr);
}

std::vector<TCondition> filters;
Expand Down Expand Up @@ -135,13 +135,38 @@ struct RuntimeColumnPredicateBuilder {
return decoder->decode(code);
}

template <LogicalType Type>
ColumnPtr min_const_column(const TypeDescriptor& col_type) {
auto min_decode_value = min_value();
if constexpr (lt_is_decimal<Type>) {
return ColumnHelper::create_const_decimal_column<Type>(min_decode_value, col_type.precision,
col_type.scale, 1);
} else {
return ColumnHelper::create_const_column<Type>(min_decode_value, 1);
}
}

template <LogicalType Type>
ColumnPtr max_const_column(const TypeDescriptor& col_type) {
auto max_decode_value = max_value();
if constexpr (lt_is_decimal<Type>) {
return ColumnHelper::create_const_decimal_column<Type>(max_decode_value, col_type.precision,
col_type.scale, 1);
} else {
return ColumnHelper::create_const_column<Type>(max_decode_value, 1);
}
}

private:
const RuntimeFilter* runtime_filter;
const Decoder* decoder;
};

template <class Range, class value_type, LogicalType mapping_type, template <class> class Decoder, class... Args>
template <class Range, LogicalType SlotType, LogicalType mapping_type, template <class> class Decoder,
class... Args>
static void build_minmax_range(Range& range, const JoinRuntimeFilter* rf, Args&&... args) {
using ValueType = typename RunTimeTypeTraits<SlotType>::CppType;

const RuntimeBloomFilter<mapping_type>* filter = down_cast<const RuntimeBloomFilter<mapping_type>*>(rf);
using DecoderType = Decoder<typename RunTimeTypeTraits<mapping_type>::CppType>;
DecoderType decoder(std::forward<Args>(args)...);
Expand All @@ -153,7 +178,7 @@ struct RuntimeColumnPredicateBuilder {
min_op = to_olap_filter_type(TExprOpcode::GT, false);
}
auto min_value = parser.min_value();
(void)range.add_range(min_op, static_cast<value_type>(min_value));
(void)range.add_range(min_op, static_cast<ValueType>(min_value));

SQLFilterOp max_op;
if (filter->right_close_interval()) {
Expand All @@ -163,7 +188,7 @@ struct RuntimeColumnPredicateBuilder {
}

auto max_value = parser.max_value();
(void)range.add_range(max_op, static_cast<value_type>(max_value));
(void)range.add_range(max_op, static_cast<ValueType>(max_value));
}
};
} // namespace detail
Expand Down
30 changes: 30 additions & 0 deletions be/src/testutil/schema_test_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,42 @@ TabletSchemaPB SchemaTestHelper::gen_schema_pb_of_dup(TabletSchema::SchemaId sch
return schema_pb;
}

TabletSchemaPB SchemaTestHelper::gen_varchar_schema_pb_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols,
size_t num_key_cols) {
TabletSchemaPB schema_pb;

schema_pb.set_keys_type(DUP_KEYS);
schema_pb.set_num_short_key_columns(num_key_cols);
schema_pb.set_id(schema_id);

for (size_t i = 0; i < num_cols; i++) {
auto c0 = schema_pb.add_column();
c0->set_unique_id(i);
c0->set_name("c" + std::to_string(i));
c0->set_type("VARCHAR");
c0->set_is_nullable(true);
c0->set_index_length(4);
c0->set_length(100);
if (i < num_key_cols) {
c0->set_is_key(true);
}
}

return schema_pb;
}

TabletSchemaSPtr SchemaTestHelper::gen_schema_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols,
size_t num_key_cols) {
TabletSchemaPB schema_pb = SchemaTestHelper::gen_schema_pb_of_dup(1, 3, 1);
return std::make_shared<TabletSchema>(schema_pb);
}

TabletSchemaSPtr SchemaTestHelper::gen_varchar_schema_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols,
size_t num_key_cols) {
TabletSchemaPB schema_pb = SchemaTestHelper::gen_varchar_schema_pb_of_dup(1, 3, 1);
return std::make_shared<TabletSchema>(schema_pb);
}

TColumn SchemaTestHelper::gen_key_column(const std::string& col_name, TPrimitiveType::type type) {
TColumnType col_type;
col_type.type = type;
Expand Down
4 changes: 4 additions & 0 deletions be/src/testutil/schema_test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ namespace starrocks {
class SchemaTestHelper {
public:
static TabletSchemaPB gen_schema_pb_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols, size_t num_key_cols);
static TabletSchemaPB gen_varchar_schema_pb_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols,
size_t num_key_cols);
static TabletSchemaSPtr gen_schema_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols, size_t num_key_cols);
static TabletSchemaSPtr gen_varchar_schema_of_dup(TabletSchema::SchemaId schema_id, size_t num_cols,
size_t num_key_cols);
static TColumn gen_key_column(const std::string& col_name, TPrimitiveType::type type);
static TColumn gen_value_column_for_dup_table(const std::string& col_name, TPrimitiveType::type type);
static TColumn gen_value_column_for_agg_table(const std::string& col_name, TPrimitiveType::type type);
Expand Down
1 change: 1 addition & 0 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ set(EXEC_FILES
./exec/repeat_node_test.cpp
./exec/sorting_test.cpp
./exec/table_function_node_test.cpp
./exec/olap_scan_prepare_test.cpp
./exprs/agg/json_each_test.cpp
./exprs/agg/aggregate_test.cpp
./exprs/arithmetic_expr_test.cpp
Expand Down
Loading
Loading