Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](runtime-filter) support ip rf and use exception to replace dcheck when PrimitiveType to PColumnType #39985

Merged
merged 7 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 85 additions & 23 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ PColumnType to_proto(PrimitiveType type) {
return PColumnType::COLUMN_TYPE_VARCHAR;
case TYPE_STRING:
return PColumnType::COLUMN_TYPE_STRING;
case TYPE_IPV4:
return PColumnType::COLUMN_TYPE_IPV4;
case TYPE_IPV6:
return PColumnType::COLUMN_TYPE_IPV6;
default:
DCHECK(false) << "Invalid type.";
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(type));
}
DCHECK(false);
return PColumnType::COLUMN_TYPE_INT;
}

// PColumnType->PrimitiveType
Expand Down Expand Up @@ -160,10 +163,14 @@ PrimitiveType to_primitive_type(PColumnType type) {
return TYPE_CHAR;
case PColumnType::COLUMN_TYPE_STRING:
return TYPE_STRING;
case PColumnType::COLUMN_TYPE_IPV4:
return TYPE_IPV4;
case PColumnType::COLUMN_TYPE_IPV6:
return TYPE_IPV6;
default:
DCHECK(false);
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PColumnType type {}", int(type));
}
return TYPE_INT;
}

// PFilterType -> RuntimeFilterType
Expand Down Expand Up @@ -554,14 +561,13 @@ class RuntimePredicateWrapper {
}

Status assign(const PInFilter* in_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(in_filter->column_type());
_context->hybrid_set.reset(create_set(type));
_context->hybrid_set.reset(create_set(_column_return_type));
if (contain_null) {
_context->hybrid_set->set_null_aware(true);
_context->hybrid_set->insert((const void*)nullptr);
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
bool bool_val = column.boolval();
Expand Down Expand Up @@ -701,9 +707,27 @@ class RuntimePredicateWrapper {
});
break;
}
case TYPE_IPV4: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
int32_t tmp = column.intval();
set->insert(&tmp);
});
break;
}
case TYPE_IPV6: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
auto string_val = column.stringval();
StringParser::ParseResult result;
auto int128_val = StringParser::string_to_int<uint128_t>(
string_val.c_str(), string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
set->insert(&int128_val);
});
break;
}
default: {
return Status::InternalError("not support assign to in filter, type: " +
type_to_string(type));
type_to_string(_column_return_type));
}
}
return Status::OK();
Expand All @@ -726,15 +750,14 @@ class RuntimePredicateWrapper {
// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(minmax_filter->column_type());
_context->minmax_func.reset(create_minmax_filter(type));
_context->minmax_func.reset(create_minmax_filter(_column_return_type));

if (contain_null) {
_context->minmax_func->set_null_aware(true);
_context->minmax_func->set_contain_null();
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
bool min_val = minmax_filter->min_val().boolval();
bool max_val = minmax_filter->max_val().boolval();
Expand Down Expand Up @@ -850,6 +873,23 @@ class RuntimePredicateWrapper {
auto max_val_ref = minmax_filter->max_val().stringval();
return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
}
case TYPE_IPV4: {
int tmp_min = minmax_filter->min_val().intval();
int tmp_max = minmax_filter->max_val().intval();
return _context->minmax_func->assign(&tmp_min, &tmp_max);
}
case TYPE_IPV6: {
auto min_string_val = minmax_filter->min_val().stringval();
auto max_string_val = minmax_filter->max_val().stringval();
StringParser::ParseResult result;
auto min_val = StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
min_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
auto max_val = StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
max_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
return _context->minmax_func->assign(&min_val, &max_val);
}
default:
break;
}
Expand Down Expand Up @@ -1140,7 +1180,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_is_pipeline(true);
auto column_type = _wrapper->column_type();
merge_filter_request->set_column_type(to_proto(column_type));
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());

if (get_ignored()) {
Expand Down Expand Up @@ -1413,13 +1453,10 @@ template <class T>
Status IRuntimeFilter::_create_wrapper(const T* param,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
if (param->request->has_in_filter()) {
column_type = to_primitive_type(param->request->in_filter().column_type());
}
if (param->request->has_column_type()) {
column_type = to_primitive_type(param->request->column_type());
if (!param->request->has_column_type()) {
return Status::InternalError("unknown filter column type");
}
PrimitiveType column_type = to_primitive_type(param->request->column_type());
*wrapper = std::make_unique<RuntimePredicateWrapper>(column_type, get_type(filter_type),
param->request->filter_id());

Expand Down Expand Up @@ -1639,9 +1676,21 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
});
return;
}
case TYPE_IPV4: {
batch_copy<IPv4>(filter, it, [](PColumnValue* column, const IPv4* value) {
column->set_intval(*reinterpret_cast<const int32_t*>(value));
});
return;
}
case TYPE_IPV6: {
batch_copy<IPv6>(filter, it, [](PColumnValue* column, const IPv6* value) {
column->set_stringval(LargeIntValue::to_string(*value));
});
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(column_type));
}
}
}
Expand Down Expand Up @@ -1755,9 +1804,22 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
filter->mutable_max_val()->set_stringval(*max_string_value);
break;
}
case TYPE_IPV4: {
filter->mutable_min_val()->set_intval(*reinterpret_cast<const int32_t*>(min_data));
filter->mutable_max_val()->set_intval(*reinterpret_cast<const int32_t*>(max_data));
return;
}
case TYPE_IPV6: {
filter->mutable_min_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(min_data)));
filter->mutable_max_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(max_data)));
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}",
int(_wrapper->column_type()));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/predicate_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& ty
case FieldType::OLAP_FIELD_TYPE_IPV4: {
return std::make_unique<CustomPredicateCreator<TYPE_IPV4, PT, ConditionType>>(
[](const std::string& condition) {
vectorized::IPv4 value;
IPv4 value;
bool res = IPv4Value::from_string(value, condition);
DCHECK(res);
return value;
Expand All @@ -251,7 +251,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& ty
case FieldType::OLAP_FIELD_TYPE_IPV6: {
return std::make_unique<CustomPredicateCreator<TYPE_IPV6, PT, ConditionType>>(
[](const std::string& condition) {
vectorized::IPv6 value;
IPv6 value;
bool res = IPv6Value::from_string(value, condition);
DCHECK(res);
return value;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/large_int_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <iostream>
#include <string>

#include "olap/olap_common.h"

namespace doris {

inline const __int128 MAX_INT128 = ~((__int128)0x01 << 127);
Expand All @@ -36,6 +38,9 @@ class LargeIntValue {
}

static std::string to_string(__int128 value) { return fmt::format(FMT_COMPILE("{}"), value); }
static std::string to_string(__uint128_t value) {
return fmt::format(FMT_COMPILE("{}"), value);
}
};

std::ostream& operator<<(std::ostream& os, __int128 const& value);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/primitive_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ struct PrimitiveTypeTraits<TYPE_LARGEINT> {
};
template <>
struct PrimitiveTypeTraits<TYPE_IPV4> {
using CppType = vectorized::IPv4;
using CppType = IPv4;
using StorageFieldType = CppType;
using ColumnType = vectorized::ColumnIPv4;
};
template <>
struct PrimitiveTypeTraits<TYPE_IPV6> {
using CppType = vectorized::IPv6;
using CppType = IPv6;
using StorageFieldType = CppType;
using ColumnType = vectorized::ColumnIPv6;
};
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/core/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct decimal12_t;
struct uint24_t;
struct StringRef;

using IPv4 = uint32_t;
using IPv6 = uint128_t;

namespace vectorized {

/// Data types for representing elementary values from a database in RAM.
Expand Down Expand Up @@ -296,9 +299,6 @@ struct TypeId<String> {
/// Not a data type in database, defined just for convenience.
using Strings = std::vector<String>;

using IPv4 = uint32_t;
using IPv6 = uint128_t;

template <>
inline constexpr bool IsNumber<IPv6> = true;
template <>
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/data_types/data_type_ipv6.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ std::string DataTypeIPv6::to_string(const IColumn& column, size_t row_num) const
return value.to_string();
}

std::string DataTypeIPv6::to_string(const IPv6& ipv6_val) const {
std::string DataTypeIPv6::to_string(const IPv6& ipv6_val) {
auto value = IPv6Value(ipv6_val);
return value.to_string();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/data_types/data_type_ipv6.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class DataTypeIPv6 final : public DataTypeNumberBase<IPv6> {
void push_number(ColumnString::Chars& chars, const IPv6& num) const;
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
std::string to_string(const IPv6& value) const;
static std::string to_string(const IPv6& value);
Status from_string(ReadBuffer& rb, IColumn* column) const override;

Field get_field(const TExprNode& node) const override {
Expand Down
13 changes: 11 additions & 2 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/data_type_factory.hpp"
Expand Down Expand Up @@ -147,9 +148,17 @@ TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, in
THROW_IF_ERROR(create_texpr_literal_node<TYPE_STRING>(data, &node));
break;
}
case TYPE_IPV4: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only ipv4? where is 6?

THROW_IF_ERROR(create_texpr_literal_node<TYPE_IPV4>(data, &node));
break;
}
case TYPE_IPV6: {
THROW_IF_ERROR(create_texpr_literal_node<TYPE_IPV6>(data, &node));
break;
}
default:
DCHECK(false);
throw std::invalid_argument("Invalid type!");
throw Exception(ErrorCode::INTERNAL_ERROR, "runtime filter meet invalid type {}",
int(type));
}
return node;
}
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/core/wide_integer.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/functions/function.h"

Expand Down Expand Up @@ -474,6 +476,20 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio
string_literal.__set_value(*origin_value);
(*node).__set_string_literal(string_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING));
} else if constexpr (T == TYPE_IPV4) {
const auto* origin_value = reinterpret_cast<const IPv4*>(data);
(*node).__set_node_type(TExprNodeType::IPV4_LITERAL);
TIPv4Literal literal;
literal.__set_value(*origin_value);
(*node).__set_ipv4_literal(literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_IPV4));
} else if constexpr (T == TYPE_IPV6) {
const auto* origin_value = reinterpret_cast<const IPv6*>(data);
(*node).__set_node_type(TExprNodeType::IPV6_LITERAL);
TIPv6Literal literal;
literal.__set_value(vectorized::DataTypeIPv6::to_string(*origin_value));
(*node).__set_ipv6_literal(literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_IPV6));
} else {
return Status::InvalidArgument("Invalid argument type!");
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/olap/olap_data_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int128>>();
}
case FieldType::OLAP_FIELD_TYPE_IPV4: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::IPv4>>();
return std::make_unique<OlapColumnDataConvertorSimple<IPv4>>();
}
case FieldType::OLAP_FIELD_TYPE_IPV6: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::IPv6>>();
return std::make_unique<OlapColumnDataConvertorSimple<IPv6>>();
}
case FieldType::OLAP_FIELD_TYPE_FLOAT: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Float32>>();
Expand Down
Loading
Loading