Skip to content

Commit

Permalink
[Bug](runtime-filter) support ip rf and use exception to replace dche…
Browse files Browse the repository at this point in the history
…ck when PrimitiveType to PColumnType (#39985)

## Proposed changes
use exception to replace dcheck when PrimitiveType to PColumnType
```cpp
*** SIGABRT unknown detail explain (@0x11d3f) received by PID 73023 (TID 74292 OR 0x7fd758225640) from PID 73023; stack trace: ***
 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421
 1# 0x00007FDDBE6B9520 in /lib/x86_64-linux-gnu/libc.so.6
 2# pthread_kill at ./nptl/pthread_kill.c:89
 3# raise at ../sysdeps/posix/raise.c:27
 4# abort at ./stdlib/abort.c:81
 5# 0x000056123F81A94D in /root/output/be/lib/doris_be
 6# 0x000056123F80CF8A in /root/output/be/lib/doris_be
 7# google::LogMessage::SendToLog() in /root/output/be/lib/doris_be
 8# google::LogMessage::Flush() in /root/output/be/lib/doris_be
 9# google::LogMessageFatal::~LogMessageFatal() in /root/output/be/lib/doris_be
10# doris::to_proto(doris::PrimitiveType) at /home/zcp/repo_center/doris_master/doris/be/src/exprs/runtime_filter.cpp:114
11# doris::IRuntimeFilter::push_to_remote(doris::TNetworkAddress const*) at /home/zcp/repo_center/doris_master/doris/be/src/exprs/runtime_filter.cpp:1143
12# doris::IRuntimeFilter::publish(bool)::$_0::operator()(doris::IRuntimeFilter*) const at /home/zcp/repo_center/doris_master/doris/be/src/exprs/runtime_filter.cpp:959
13# doris::IRuntimeFilter::publish(bool)::$_2::operator()() const at /home/zcp/repo_center/doris_master/doris/be/src/exprs/runtime_filter.cpp:983
14# doris::IRuntimeFilter::publish(bool) at /home/zcp/repo_center/doris_master/doris/be/src/exprs/runtime_filter.cpp:997
```
  • Loading branch information
BiteTheDDDDt authored and dataroaring committed Oct 5, 2024
1 parent 02e2b20 commit c7df3e0
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 66 deletions.
108 changes: 85 additions & 23 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ PColumnType to_proto(PrimitiveType type) {
return PColumnType::COLUMN_TYPE_VARCHAR;
case TYPE_STRING:
return PColumnType::COLUMN_TYPE_STRING;
case TYPE_IPV4:
return PColumnType::COLUMN_TYPE_IPV4;
case TYPE_IPV6:
return PColumnType::COLUMN_TYPE_IPV6;
default:
DCHECK(false) << "Invalid type.";
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(type));
}
DCHECK(false);
return PColumnType::COLUMN_TYPE_INT;
}

// PColumnType->PrimitiveType
Expand Down Expand Up @@ -160,10 +163,14 @@ PrimitiveType to_primitive_type(PColumnType type) {
return TYPE_CHAR;
case PColumnType::COLUMN_TYPE_STRING:
return TYPE_STRING;
case PColumnType::COLUMN_TYPE_IPV4:
return TYPE_IPV4;
case PColumnType::COLUMN_TYPE_IPV6:
return TYPE_IPV6;
default:
DCHECK(false);
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PColumnType type {}", int(type));
}
return TYPE_INT;
}

// PFilterType -> RuntimeFilterType
Expand Down Expand Up @@ -554,14 +561,13 @@ class RuntimePredicateWrapper {
}

Status assign(const PInFilter* in_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(in_filter->column_type());
_context->hybrid_set.reset(create_set(type));
_context->hybrid_set.reset(create_set(_column_return_type));
if (contain_null) {
_context->hybrid_set->set_null_aware(true);
_context->hybrid_set->insert((const void*)nullptr);
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
bool bool_val = column.boolval();
Expand Down Expand Up @@ -701,9 +707,27 @@ class RuntimePredicateWrapper {
});
break;
}
case TYPE_IPV4: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
int32_t tmp = column.intval();
set->insert(&tmp);
});
break;
}
case TYPE_IPV6: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set, PColumnValue& column) {
auto string_val = column.stringval();
StringParser::ParseResult result;
auto int128_val = StringParser::string_to_int<uint128_t>(
string_val.c_str(), string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
set->insert(&int128_val);
});
break;
}
default: {
return Status::InternalError("not support assign to in filter, type: " +
type_to_string(type));
type_to_string(_column_return_type));
}
}
return Status::OK();
Expand All @@ -726,15 +750,14 @@ class RuntimePredicateWrapper {
// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
PrimitiveType type = to_primitive_type(minmax_filter->column_type());
_context->minmax_func.reset(create_minmax_filter(type));
_context->minmax_func.reset(create_minmax_filter(_column_return_type));

if (contain_null) {
_context->minmax_func->set_null_aware(true);
_context->minmax_func->set_contain_null();
}

switch (type) {
switch (_column_return_type) {
case TYPE_BOOLEAN: {
bool min_val = minmax_filter->min_val().boolval();
bool max_val = minmax_filter->max_val().boolval();
Expand Down Expand Up @@ -850,6 +873,23 @@ class RuntimePredicateWrapper {
auto max_val_ref = minmax_filter->max_val().stringval();
return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
}
case TYPE_IPV4: {
int tmp_min = minmax_filter->min_val().intval();
int tmp_max = minmax_filter->max_val().intval();
return _context->minmax_func->assign(&tmp_min, &tmp_max);
}
case TYPE_IPV6: {
auto min_string_val = minmax_filter->min_val().stringval();
auto max_string_val = minmax_filter->max_val().stringval();
StringParser::ParseResult result;
auto min_val = StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
min_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
auto max_val = StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
max_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
return _context->minmax_func->assign(&min_val, &max_val);
}
default:
break;
}
Expand Down Expand Up @@ -1140,7 +1180,7 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
merge_filter_request->set_filter_id(_filter_id);
merge_filter_request->set_is_pipeline(true);
auto column_type = _wrapper->column_type();
merge_filter_request->set_column_type(to_proto(column_type));
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());

if (get_ignored()) {
Expand Down Expand Up @@ -1411,13 +1451,10 @@ template <class T>
Status IRuntimeFilter::_create_wrapper(const T* param,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
int filter_type = param->request->filter_type();
PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
if (param->request->has_in_filter()) {
column_type = to_primitive_type(param->request->in_filter().column_type());
}
if (param->request->has_column_type()) {
column_type = to_primitive_type(param->request->column_type());
if (!param->request->has_column_type()) {
return Status::InternalError("unknown filter column type");
}
PrimitiveType column_type = to_primitive_type(param->request->column_type());
*wrapper = std::make_unique<RuntimePredicateWrapper>(column_type, get_type(filter_type),
param->request->filter_id());

Expand Down Expand Up @@ -1637,9 +1674,21 @@ void IRuntimeFilter::to_protobuf(PInFilter* filter) {
});
return;
}
case TYPE_IPV4: {
batch_copy<IPv4>(filter, it, [](PColumnValue* column, const IPv4* value) {
column->set_intval(*reinterpret_cast<const int32_t*>(value));
});
return;
}
case TYPE_IPV6: {
batch_copy<IPv6>(filter, it, [](PColumnValue* column, const IPv6* value) {
column->set_stringval(LargeIntValue::to_string(*value));
});
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}", int(column_type));
}
}
}
Expand Down Expand Up @@ -1753,9 +1802,22 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
filter->mutable_max_val()->set_stringval(*max_string_value);
break;
}
case TYPE_IPV4: {
filter->mutable_min_val()->set_intval(*reinterpret_cast<const int32_t*>(min_data));
filter->mutable_max_val()->set_intval(*reinterpret_cast<const int32_t*>(max_data));
return;
}
case TYPE_IPV6: {
filter->mutable_min_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(min_data)));
filter->mutable_max_val()->set_stringval(
LargeIntValue::to_string(*reinterpret_cast<const uint128_t*>(max_data)));
return;
}
default: {
DCHECK(false) << "unknown type";
break;
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter meet invalid PrimitiveType type {}",
int(_wrapper->column_type()));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/predicate_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& ty
case FieldType::OLAP_FIELD_TYPE_IPV4: {
return std::make_unique<CustomPredicateCreator<TYPE_IPV4, PT, ConditionType>>(
[](const std::string& condition) {
vectorized::IPv4 value;
IPv4 value;
bool res = IPv4Value::from_string(value, condition);
DCHECK(res);
return value;
Expand All @@ -251,7 +251,7 @@ std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldType& ty
case FieldType::OLAP_FIELD_TYPE_IPV6: {
return std::make_unique<CustomPredicateCreator<TYPE_IPV6, PT, ConditionType>>(
[](const std::string& condition) {
vectorized::IPv6 value;
IPv6 value;
bool res = IPv6Value::from_string(value, condition);
DCHECK(res);
return value;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/large_int_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <iostream>
#include <string>

#include "olap/olap_common.h"

namespace doris {

inline const __int128 MAX_INT128 = ~((__int128)0x01 << 127);
Expand All @@ -36,6 +38,9 @@ class LargeIntValue {
}

static std::string to_string(__int128 value) { return fmt::format(FMT_COMPILE("{}"), value); }
static std::string to_string(__uint128_t value) {
return fmt::format(FMT_COMPILE("{}"), value);
}
};

std::ostream& operator<<(std::ostream& os, __int128 const& value);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/primitive_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ struct PrimitiveTypeTraits<TYPE_LARGEINT> {
};
template <>
struct PrimitiveTypeTraits<TYPE_IPV4> {
using CppType = vectorized::IPv4;
using CppType = IPv4;
using StorageFieldType = CppType;
using ColumnType = vectorized::ColumnIPv4;
};
template <>
struct PrimitiveTypeTraits<TYPE_IPV6> {
using CppType = vectorized::IPv6;
using CppType = IPv6;
using StorageFieldType = CppType;
using ColumnType = vectorized::ColumnIPv6;
};
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/core/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct decimal12_t;
struct uint24_t;
struct StringRef;

using IPv4 = uint32_t;
using IPv6 = uint128_t;

namespace vectorized {

/// Data types for representing elementary values from a database in RAM.
Expand Down Expand Up @@ -296,9 +299,6 @@ struct TypeId<String> {
/// Not a data type in database, defined just for convenience.
using Strings = std::vector<String>;

using IPv4 = uint32_t;
using IPv6 = uint128_t;

template <>
inline constexpr bool IsNumber<IPv6> = true;
template <>
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/data_types/data_type_ipv6.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ std::string DataTypeIPv6::to_string(const IColumn& column, size_t row_num) const
return value.to_string();
}

std::string DataTypeIPv6::to_string(const IPv6& ipv6_val) const {
std::string DataTypeIPv6::to_string(const IPv6& ipv6_val) {
auto value = IPv6Value(ipv6_val);
return value.to_string();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/data_types/data_type_ipv6.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class DataTypeIPv6 final : public DataTypeNumberBase<IPv6> {
void push_number(ColumnString::Chars& chars, const IPv6& num) const;
std::string to_string(const IColumn& column, size_t row_num) const override;
void to_string(const IColumn& column, size_t row_num, BufferWritable& ostr) const override;
std::string to_string(const IPv6& value) const;
static std::string to_string(const IPv6& value);
Status from_string(ReadBuffer& rb, IColumn* column) const override;

Field get_field(const TExprNode& node) const override {
Expand Down
13 changes: 11 additions & 2 deletions be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/data_type_array.h"
Expand Down Expand Up @@ -147,9 +148,17 @@ TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, in
THROW_IF_ERROR(create_texpr_literal_node<TYPE_STRING>(data, &node));
break;
}
case TYPE_IPV4: {
THROW_IF_ERROR(create_texpr_literal_node<TYPE_IPV4>(data, &node));
break;
}
case TYPE_IPV6: {
THROW_IF_ERROR(create_texpr_literal_node<TYPE_IPV6>(data, &node));
break;
}
default:
DCHECK(false);
throw std::invalid_argument("Invalid type!");
throw Exception(ErrorCode::INTERNAL_ERROR, "runtime filter meet invalid type {}",
int(type));
}
return node;
}
Expand Down
16 changes: 16 additions & 0 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/core/wide_integer.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_ipv6.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/functions/function.h"

Expand Down Expand Up @@ -464,6 +466,20 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio
string_literal.__set_value(*origin_value);
(*node).__set_string_literal(string_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_STRING));
} else if constexpr (T == TYPE_IPV4) {
const auto* origin_value = reinterpret_cast<const IPv4*>(data);
(*node).__set_node_type(TExprNodeType::IPV4_LITERAL);
TIPv4Literal literal;
literal.__set_value(*origin_value);
(*node).__set_ipv4_literal(literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_IPV4));
} else if constexpr (T == TYPE_IPV6) {
const auto* origin_value = reinterpret_cast<const IPv6*>(data);
(*node).__set_node_type(TExprNodeType::IPV6_LITERAL);
TIPv6Literal literal;
literal.__set_value(vectorized::DataTypeIPv6::to_string(*origin_value));
(*node).__set_ipv6_literal(literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_IPV6));
} else {
return Status::InvalidArgument("Invalid argument type!");
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/olap/olap_data_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& co
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int128>>();
}
case FieldType::OLAP_FIELD_TYPE_IPV4: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::IPv4>>();
return std::make_unique<OlapColumnDataConvertorSimple<IPv4>>();
}
case FieldType::OLAP_FIELD_TYPE_IPV6: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::IPv6>>();
return std::make_unique<OlapColumnDataConvertorSimple<IPv6>>();
}
case FieldType::OLAP_FIELD_TYPE_FLOAT: {
return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Float32>>();
Expand Down
Loading

0 comments on commit c7df3e0

Please sign in to comment.