Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DECIMAL order-by for RANGE window functions #11645

Merged
merged 16 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions cpp/src/rolling/detail/range_window_bounds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace detail {
template <typename RangeType>
constexpr bool is_supported_range_type()
{
return cudf::is_duration<RangeType>() ||
return cudf::is_duration<RangeType>() || cudf::is_fixed_point<RangeType>() ||
(std::is_integral_v<RangeType> && !cudf::is_boolean<RangeType>());
}

Expand All @@ -37,7 +37,7 @@ constexpr bool is_supported_range_type()
template <typename ColumnType>
constexpr bool is_supported_order_by_column_type()
{
return cudf::is_timestamp<ColumnType>() ||
return cudf::is_timestamp<ColumnType>() || cudf::is_fixed_point<ColumnType>() ||
(std::is_integral_v<ColumnType> && !cudf::is_boolean<ColumnType>());
}

Expand All @@ -49,6 +49,11 @@ constexpr bool is_supported_order_by_column_type()
/// a. For `TIMESTAMP_DAYS`, the range-type is `DURATION_DAYS`.
/// Comparisons are done in `int32_t`.
/// b. For all other timestamp types, comparisons are done in `int64_t`.
/// 3. For decimal types, all comparisons are done with the rep type,
/// after scaling the rep value to the same scale as the order by column:
/// a. For decimal32, the range-type is `int32_t`.
/// b. For decimal64, the range-type is `int64_t`.
/// c. For decimal128, the range-type is `__int128_t`.
template <typename ColumnType, typename = void>
struct range_type_impl {
using type = void;
Expand All @@ -69,68 +74,92 @@ struct range_type_impl<TimestampType, std::enable_if_t<cudf::is_timestamp<Timest
using rep_type = typename type::rep;
};

template <typename FixedPointType>
struct range_type_impl<FixedPointType,
std::enable_if_t<cudf::is_fixed_point<FixedPointType>(), void>> {
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
using type = FixedPointType;
using rep_type = typename type::rep;
};

template <typename ColumnType>
using range_type = typename range_type_impl<ColumnType>::type;

template <typename ColumnType>
using range_rep_type = typename range_type_impl<ColumnType>::rep_type;

namespace {

template <typename T>
void assert_non_negative(T const& value)
void assert_non_negative([[maybe_unused]] T const& value)
{
(void)value;
if constexpr (std::numeric_limits<T>::is_signed) {
CUDF_EXPECTS(value >= T{0}, "Range scalar must be >= 0.");
}
}

template <
typename RangeT,
typename RepT,
std::enable_if_t<std::is_integral_v<RangeT> && !cudf::is_boolean<RangeT>(), void>* = nullptr>
RepT range_comparable_value_impl(scalar const& range_scalar, rmm::cuda_stream_view stream)
template <typename RangeT,
typename RepT,
CUDF_ENABLE_IF(std::is_integral_v<RangeT> && !cudf::is_boolean<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool,
data_type const&,
rmm::cuda_stream_view stream)
{
auto val = static_cast<numeric_scalar<RangeT> const&>(range_scalar).value(stream);
assert_non_negative(val);
return val;
}

template <typename RangeT,
typename RepT,
std::enable_if_t<cudf::is_duration<RangeT>(), void>* = nullptr>
RepT range_comparable_value_impl(scalar const& range_scalar, rmm::cuda_stream_view stream)
template <typename RangeT, typename RepT, CUDF_ENABLE_IF(cudf::is_duration<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool,
data_type const&,
rmm::cuda_stream_view stream)
{
auto val = static_cast<duration_scalar<RangeT> const&>(range_scalar).value(stream).count();
assert_non_negative(val);
return val;
}

} // namespace
template <typename RangeT, typename RepT, CUDF_ENABLE_IF(cudf::is_fixed_point<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool is_unbounded,
data_type const& order_by_data_type,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(is_unbounded || range_scalar.type().scale() >= order_by_data_type.scale(),
"Range bounds scalar must match/exceed the scale of the orderby column.");
auto const fixed_point_value =
static_cast<fixed_point_scalar<RangeT> const&>(range_scalar).fixed_point_value(stream);
auto const value =
fixed_point_value.rescaled(numeric::scale_type{order_by_data_type.scale()}).value();
assert_non_negative(value);
return value;
}

/**
* @brief Fetch the value of the range_window_bounds scalar, for comparisons
* with an orderby column's rows.
*
* @tparam OrderByType The type of the orderby column with which the range value will be compared
* @param range_bounds The range_window_bounds whose value is to be read
* @param order_by_data_type The data type for the order-by column
* @param stream The CUDA stream for device memory operations
* @return RepType Value of the range scalar
*/
template <typename OrderByType>
range_rep_type<OrderByType> range_comparable_value(
range_window_bounds const& range_bounds,
rmm::cuda_stream_view stream = cudf::default_stream_value)
data_type const& order_by_data_type = data_type{type_to_id<OrderByType>()},
rmm::cuda_stream_view stream = cudf::default_stream_value)
{
auto const& range_scalar = range_bounds.range_scalar();
using range_type = cudf::detail::range_type<OrderByType>;

CUDF_EXPECTS(range_scalar.type().id() == cudf::type_to_id<range_type>(),
"Unexpected range type for specified orderby column.");
"Range bounds scalar must match the type of the orderby column.");

using rep_type = cudf::detail::range_rep_type<OrderByType>;
return range_comparable_value_impl<range_type, rep_type>(range_scalar, stream);
return range_comparable_value_impl<range_type, rep_type>(
range_scalar, range_bounds.is_unbounded(), order_by_data_type, stream);
}

} // namespace detail
Expand Down
40 changes: 22 additions & 18 deletions cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -224,47 +224,49 @@ namespace {
/**
* @brief Add `delta` to value, and cap at numeric_limits::max(), for signed types.
*/
template <typename T, std::enable_if_t<std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(cuda::std::numeric_limits<T>::is_signed)>
__device__ T add_safe(T const& value, T const& delta)
{
// delta >= 0.
return (value < 0 || (std::numeric_limits<T>::max() - value) >= delta)
return (value < 0 || (cuda::std::numeric_limits<T>::max() - value) >= delta)
? (value + delta)
: std::numeric_limits<T>::max();
: cuda::std::numeric_limits<T>::max();
}

/**
* @brief Add `delta` to value, and cap at numeric_limits::max(), for unsigned types.
*/
template <typename T, std::enable_if_t<!std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(not cuda::std::numeric_limits<T>::is_signed)>
__device__ T add_safe(T const& value, T const& delta)
{
// delta >= 0.
return ((std::numeric_limits<T>::max() - value) >= delta) ? (value + delta)
: std::numeric_limits<T>::max();
return ((cuda::std::numeric_limits<T>::max() - value) >= delta)
? (value + delta)
: cuda::std::numeric_limits<T>::max();
}

/**
* @brief Subtract `delta` from value, and cap at numeric_limits::min(), for signed types.
*/
template <typename T, std::enable_if_t<std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(cuda::std::numeric_limits<T>::is_signed)>
__device__ T subtract_safe(T const& value, T const& delta)
{
// delta >= 0;
return (value >= 0 || (value - std::numeric_limits<T>::min()) >= delta)
return (value >= 0 || (value - cuda::std::numeric_limits<T>::min()) >= delta)
? (value - delta)
: std::numeric_limits<T>::min();
: cuda::std::numeric_limits<T>::min();
}

/**
* @brief Subtract `delta` from value, and cap at numeric_limits::min(), for unsigned types.
*/
template <typename T, std::enable_if_t<!std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(not cuda::std::numeric_limits<T>::is_signed)>
__device__ T subtract_safe(T const& value, T const& delta)
{
// delta >= 0;
return ((value - std::numeric_limits<T>::min()) >= delta) ? (value - delta)
: std::numeric_limits<T>::min();
return ((value - cuda::std::numeric_limits<T>::min()) >= delta)
? (value - delta)
: cuda::std::numeric_limits<T>::min();
}

/// Given a single, ungrouped order-by column, return the indices corresponding
Expand Down Expand Up @@ -780,7 +782,7 @@ template <typename OrderByT>
std::unique_ptr<column> grouped_range_rolling_window_impl(
column_view const& input,
column_view const& orderby_column,
cudf::order const& timestamp_ordering,
cudf::order const& order_of_orderby_column,
rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
range_window_bounds const& preceding_window,
Expand All @@ -790,10 +792,12 @@ std::unique_ptr<column> grouped_range_rolling_window_impl(
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto preceding_value = detail::range_comparable_value<OrderByT>(preceding_window);
auto following_value = detail::range_comparable_value<OrderByT>(following_window);
auto preceding_value =
detail::range_comparable_value<OrderByT>(preceding_window, orderby_column.type(), stream);
auto following_value =
detail::range_comparable_value<OrderByT>(following_window, orderby_column.type(), stream);

if (timestamp_ordering == cudf::order::ASCENDING) {
if (order_of_orderby_column == cudf::order::ASCENDING) {
return group_offsets.is_empty() ? range_window_ASC(input,
orderby_column,
preceding_value,
Expand Down Expand Up @@ -856,7 +860,7 @@ struct dispatch_grouped_range_rolling_window {
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& orderby_column,
cudf::order const& timestamp_ordering,
cudf::order const& order_of_orderby_column,
rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
range_window_bounds const& preceding_window,
Expand All @@ -868,7 +872,7 @@ struct dispatch_grouped_range_rolling_window {
{
return grouped_range_rolling_window_impl<OrderByColumnType>(input,
orderby_column,
timestamp_ordering,
order_of_orderby_column,
group_offsets,
group_labels,
preceding_window,
Expand Down
16 changes: 11 additions & 5 deletions cpp/src/rolling/range_window_bounds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,34 @@ namespace {
* This makes it possible to copy construct and copy assign `range_window_bounds` objects.
*/
struct range_scalar_constructor {
template <typename T, std::enable_if_t<!detail::is_supported_range_type<T>(), void>* = nullptr>
template <typename T, CUDF_ENABLE_IF(not detail::is_supported_range_type<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
CUDF_FAIL(
"Unsupported range type. "
"Only Durations and non-boolean integral range types are allowed.");
"Only Durations, fixed-point, and non-boolean integral range types are allowed.");
}

template <typename T, std::enable_if_t<cudf::is_duration<T>(), void>* = nullptr>
template <typename T, CUDF_ENABLE_IF(cudf::is_duration<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<duration_scalar<T>>(
static_cast<duration_scalar<T> const&>(range_scalar_));
}

template <typename T,
std::enable_if_t<std::is_integral_v<T> && !cudf::is_boolean<T>(), void>* = nullptr>
template <typename T, CUDF_ENABLE_IF(std::is_integral_v<T> && not cudf::is_boolean<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<numeric_scalar<T>>(
static_cast<numeric_scalar<T> const&>(range_scalar_));
}

template <typename T, CUDF_ENABLE_IF(cudf::is_fixed_point<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<fixed_point_scalar<T>>(
static_cast<fixed_point_scalar<T> const&>(range_scalar_));
}
};

} // namespace
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ ConfigureTest(
ROLLING_TEST
rolling/collect_ops_test.cpp
rolling/empty_input_test.cpp
rolling/grouped_rolling_range_test.cpp
rolling/grouped_rolling_test.cpp
rolling/lead_lag_test.cpp
rolling/nth_element_test.cpp
Expand Down
Loading