Skip to content

Commit

Permalink
[fix](window_funnel) fix wrong result of window_funnel (apache#38954)
Browse files Browse the repository at this point in the history
## Proposed changes

Issue Number: close #xxx

Current logic of `window_funnel` is wrong, it cannot express the
semantic of the function.

This PR re-implement the logic.
  • Loading branch information
jacktengg authored Aug 12, 2024
1 parent 333bebd commit 4910a5a
Show file tree
Hide file tree
Showing 10 changed files with 948 additions and 57 deletions.
2 changes: 1 addition & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class BeExecVersionManager {
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 6;
constexpr inline int BeExecVersionManager::max_be_exec_version = 7;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;

/// functional
Expand Down
16 changes: 16 additions & 0 deletions be/src/util/simd/bits.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ static size_t find_byte(const std::vector<T>& vec, size_t start, T byte) {
return (T*)p - vec.data();
}

template <class T>
static size_t find_byte(const T* data, size_t start, size_t end, T byte) {
if (start >= end) {
return start;
}
const void* p = std::memchr((const void*)(data + start), byte, end - start);
if (p == nullptr) {
return end;
}
return (T*)p - data;
}

template <typename T>
bool contain_byte(const T* __restrict data, const size_t length, const signed char byte) {
return nullptr != std::memchr(reinterpret_cast<const void*>(data), byte, length);
Expand All @@ -145,6 +157,10 @@ inline size_t find_one(const std::vector<uint8_t>& vec, size_t start) {
return find_byte<uint8_t>(vec, start, 1);
}

inline size_t find_one(const uint8_t* data, size_t start, size_t end) {
return find_byte<uint8_t>(data, start, end, 1);
}

inline size_t find_zero(const std::vector<uint8_t>& vec, size_t start) {
return find_byte<uint8_t>(vec, start, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void register_aggregate_function_group_concat(AggregateFunctionSimpleFactory& fa
void register_aggregate_function_percentile(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_retention(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& factory);
void register_aggregate_function_orthogonal_bitmap(AggregateFunctionSimpleFactory& factory);
Expand Down Expand Up @@ -98,6 +99,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
register_aggregate_function_percentile_old(instance);
register_aggregate_function_percentile_approx(instance);
register_aggregate_function_window_funnel(instance);
register_aggregate_function_window_funnel_old(instance);
register_aggregate_function_retention(instance);
register_aggregate_function_orthogonal_bitmap(instance);
register_aggregate_function_collect_list(instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/logging.h"
#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/aggregate_functions/helpers.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"

Expand All @@ -38,11 +39,33 @@ AggregateFunctionPtr create_aggregate_function_window_funnel(const std::string&
}
if (WhichDataType(remove_nullable(argument_types[2])).is_date_time_v2()) {
return creator_without_type::create<
AggregateFunctionWindowFunnel<DateV2Value<DateTimeV2ValueType>, UInt64>>(
argument_types, result_is_nullable);
AggregateFunctionWindowFunnel<TypeIndex::DateTimeV2, UInt64>>(argument_types,
result_is_nullable);
} else if (WhichDataType(remove_nullable(argument_types[2])).is_date_time()) {
return creator_without_type::create<AggregateFunctionWindowFunnel<VecDateTimeValue, Int64>>(
return creator_without_type::create<
AggregateFunctionWindowFunnel<TypeIndex::DateTime, Int64>>(argument_types,
result_is_nullable);
} else {
LOG(WARNING) << "Only support DateTime type as window argument!";
return nullptr;
}
}

AggregateFunctionPtr create_aggregate_function_window_funnel_old(const std::string& name,
const DataTypes& argument_types,
const bool result_is_nullable) {
if (argument_types.size() < 3) {
LOG(WARNING) << "window_funnel's argument less than 3.";
return nullptr;
}
if (WhichDataType(remove_nullable(argument_types[2])).is_date_time_v2()) {
return creator_without_type::create<
AggregateFunctionWindowFunnelOld<DateV2Value<DateTimeV2ValueType>, UInt64>>(
argument_types, result_is_nullable);
} else if (WhichDataType(remove_nullable(argument_types[2])).is_date_time()) {
return creator_without_type::create<
AggregateFunctionWindowFunnelOld<VecDateTimeValue, Int64>>(argument_types,
result_is_nullable);
} else {
LOG(WARNING) << "Only support DateTime type as window argument!";
return nullptr;
Expand All @@ -52,4 +75,10 @@ AggregateFunctionPtr create_aggregate_function_window_funnel(const std::string&
void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& factory) {
factory.register_function_both("window_funnel", create_aggregate_function_window_funnel);
}
void register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& factory) {
factory.register_alternative_function("window_funnel",
create_aggregate_function_window_funnel_old, true);
factory.register_alternative_function("window_funnel",
create_aggregate_function_window_funnel_old, false);
}
} // namespace doris::vectorized
Loading

0 comments on commit 4910a5a

Please sign in to comment.