Skip to content

Commit

Permalink
Support DECIMAL order-by for RANGE window functions (#11645)
Browse files Browse the repository at this point in the history
CUDF grouped RANGE window functions currently support only
integral types and timestamps as the ORDER BY (OBY) column.

This commit adds support for DECIMAL types (i.e. decimal32,
decimal64, and decimal128) to be used as the ORDER BY
column in RANGE window functions.

This feature allows `spark-rapids` to address NVIDIA/spark-rapids#6400.

Authors:
  - MithunR (https://github.com/mythrocks)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - David Wendt (https://github.com/davidwendt)
  - Jason Lowe (https://github.com/jlowe)

URL: #11645
  • Loading branch information
mythrocks authored Sep 13, 2022
1 parent 7e86a1b commit 69cb31d
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 43 deletions.
67 changes: 48 additions & 19 deletions cpp/src/rolling/detail/range_window_bounds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace detail {
template <typename RangeType>
constexpr bool is_supported_range_type()
{
return cudf::is_duration<RangeType>() ||
return cudf::is_duration<RangeType>() || cudf::is_fixed_point<RangeType>() ||
(std::is_integral_v<RangeType> && !cudf::is_boolean<RangeType>());
}

Expand All @@ -37,7 +37,7 @@ constexpr bool is_supported_range_type()
template <typename ColumnType>
constexpr bool is_supported_order_by_column_type()
{
return cudf::is_timestamp<ColumnType>() ||
return cudf::is_timestamp<ColumnType>() || cudf::is_fixed_point<ColumnType>() ||
(std::is_integral_v<ColumnType> && !cudf::is_boolean<ColumnType>());
}

Expand All @@ -49,6 +49,11 @@ constexpr bool is_supported_order_by_column_type()
/// a. For `TIMESTAMP_DAYS`, the range-type is `DURATION_DAYS`.
/// Comparisons are done in `int32_t`.
/// b. For all other timestamp types, comparisons are done in `int64_t`.
/// 3. For decimal types, all comparisons are done with the rep type,
/// after scaling the rep value to the same scale as the order by column:
/// a. For decimal32, the range-type is `int32_t`.
/// b. For decimal64, the range-type is `int64_t`.
/// c. For decimal128, the range-type is `__int128_t`.
template <typename ColumnType, typename = void>
struct range_type_impl {
using type = void;
Expand All @@ -69,68 +74,92 @@ struct range_type_impl<TimestampType, std::enable_if_t<cudf::is_timestamp<Timest
using rep_type = typename type::rep;
};

template <typename FixedPointType>
struct range_type_impl<FixedPointType,
std::enable_if_t<cudf::is_fixed_point<FixedPointType>(), void>> {
using type = FixedPointType;
using rep_type = typename type::rep;
};

template <typename ColumnType>
using range_type = typename range_type_impl<ColumnType>::type;

template <typename ColumnType>
using range_rep_type = typename range_type_impl<ColumnType>::rep_type;

namespace {

template <typename T>
void assert_non_negative(T const& value)
void assert_non_negative([[maybe_unused]] T const& value)
{
(void)value;
if constexpr (std::numeric_limits<T>::is_signed) {
CUDF_EXPECTS(value >= T{0}, "Range scalar must be >= 0.");
}
}

template <
typename RangeT,
typename RepT,
std::enable_if_t<std::is_integral_v<RangeT> && !cudf::is_boolean<RangeT>(), void>* = nullptr>
RepT range_comparable_value_impl(scalar const& range_scalar, rmm::cuda_stream_view stream)
template <typename RangeT,
typename RepT,
CUDF_ENABLE_IF(std::is_integral_v<RangeT> && !cudf::is_boolean<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool,
data_type const&,
rmm::cuda_stream_view stream)
{
auto val = static_cast<numeric_scalar<RangeT> const&>(range_scalar).value(stream);
assert_non_negative(val);
return val;
}

template <typename RangeT,
typename RepT,
std::enable_if_t<cudf::is_duration<RangeT>(), void>* = nullptr>
RepT range_comparable_value_impl(scalar const& range_scalar, rmm::cuda_stream_view stream)
template <typename RangeT, typename RepT, CUDF_ENABLE_IF(cudf::is_duration<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool,
data_type const&,
rmm::cuda_stream_view stream)
{
auto val = static_cast<duration_scalar<RangeT> const&>(range_scalar).value(stream).count();
assert_non_negative(val);
return val;
}

} // namespace
template <typename RangeT, typename RepT, CUDF_ENABLE_IF(cudf::is_fixed_point<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool is_unbounded,
data_type const& order_by_data_type,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(is_unbounded || range_scalar.type().scale() >= order_by_data_type.scale(),
"Range bounds scalar must match/exceed the scale of the orderby column.");
auto const fixed_point_value =
static_cast<fixed_point_scalar<RangeT> const&>(range_scalar).fixed_point_value(stream);
auto const value =
fixed_point_value.rescaled(numeric::scale_type{order_by_data_type.scale()}).value();
assert_non_negative(value);
return value;
}

/**
* @brief Fetch the value of the range_window_bounds scalar, for comparisons
* with an orderby column's rows.
*
* @tparam OrderByType The type of the orderby column with which the range value will be compared
* @param range_bounds The range_window_bounds whose value is to be read
* @param order_by_data_type The data type for the order-by column
* @param stream The CUDA stream for device memory operations
* @return RepType Value of the range scalar
*/
template <typename OrderByType>
range_rep_type<OrderByType> range_comparable_value(
range_window_bounds const& range_bounds,
rmm::cuda_stream_view stream = cudf::default_stream_value)
data_type const& order_by_data_type = data_type{type_to_id<OrderByType>()},
rmm::cuda_stream_view stream = cudf::default_stream_value)
{
auto const& range_scalar = range_bounds.range_scalar();
using range_type = cudf::detail::range_type<OrderByType>;

CUDF_EXPECTS(range_scalar.type().id() == cudf::type_to_id<range_type>(),
"Unexpected range type for specified orderby column.");
"Range bounds scalar must match the type of the orderby column.");

using rep_type = cudf::detail::range_rep_type<OrderByType>;
return range_comparable_value_impl<range_type, rep_type>(range_scalar, stream);
return range_comparable_value_impl<range_type, rep_type>(
range_scalar, range_bounds.is_unbounded(), order_by_data_type, stream);
}

} // namespace detail
Expand Down
40 changes: 22 additions & 18 deletions cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -224,47 +224,49 @@ namespace {
/**
* @brief Add `delta` to value, and cap at numeric_limits::max(), for signed types.
*/
template <typename T, std::enable_if_t<std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(cuda::std::numeric_limits<T>::is_signed)>
__device__ T add_safe(T const& value, T const& delta)
{
// delta >= 0.
return (value < 0 || (std::numeric_limits<T>::max() - value) >= delta)
return (value < 0 || (cuda::std::numeric_limits<T>::max() - value) >= delta)
? (value + delta)
: std::numeric_limits<T>::max();
: cuda::std::numeric_limits<T>::max();
}

/**
* @brief Add `delta` to value, and cap at numeric_limits::max(), for unsigned types.
*/
template <typename T, std::enable_if_t<!std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(not cuda::std::numeric_limits<T>::is_signed)>
__device__ T add_safe(T const& value, T const& delta)
{
// delta >= 0.
return ((std::numeric_limits<T>::max() - value) >= delta) ? (value + delta)
: std::numeric_limits<T>::max();
return ((cuda::std::numeric_limits<T>::max() - value) >= delta)
? (value + delta)
: cuda::std::numeric_limits<T>::max();
}

/**
* @brief Subtract `delta` from value, and cap at numeric_limits::min(), for signed types.
*/
template <typename T, std::enable_if_t<std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(cuda::std::numeric_limits<T>::is_signed)>
__device__ T subtract_safe(T const& value, T const& delta)
{
// delta >= 0;
return (value >= 0 || (value - std::numeric_limits<T>::min()) >= delta)
return (value >= 0 || (value - cuda::std::numeric_limits<T>::min()) >= delta)
? (value - delta)
: std::numeric_limits<T>::min();
: cuda::std::numeric_limits<T>::min();
}

/**
* @brief Subtract `delta` from value, and cap at numeric_limits::min(), for unsigned types.
*/
template <typename T, std::enable_if_t<!std::numeric_limits<T>::is_signed>* = nullptr>
template <typename T, CUDF_ENABLE_IF(not cuda::std::numeric_limits<T>::is_signed)>
__device__ T subtract_safe(T const& value, T const& delta)
{
// delta >= 0;
return ((value - std::numeric_limits<T>::min()) >= delta) ? (value - delta)
: std::numeric_limits<T>::min();
return ((value - cuda::std::numeric_limits<T>::min()) >= delta)
? (value - delta)
: cuda::std::numeric_limits<T>::min();
}

/// Given a single, ungrouped order-by column, return the indices corresponding
Expand Down Expand Up @@ -780,7 +782,7 @@ template <typename OrderByT>
std::unique_ptr<column> grouped_range_rolling_window_impl(
column_view const& input,
column_view const& orderby_column,
cudf::order const& timestamp_ordering,
cudf::order const& order_of_orderby_column,
rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
range_window_bounds const& preceding_window,
Expand All @@ -790,10 +792,12 @@ std::unique_ptr<column> grouped_range_rolling_window_impl(
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto preceding_value = detail::range_comparable_value<OrderByT>(preceding_window);
auto following_value = detail::range_comparable_value<OrderByT>(following_window);
auto preceding_value =
detail::range_comparable_value<OrderByT>(preceding_window, orderby_column.type(), stream);
auto following_value =
detail::range_comparable_value<OrderByT>(following_window, orderby_column.type(), stream);

if (timestamp_ordering == cudf::order::ASCENDING) {
if (order_of_orderby_column == cudf::order::ASCENDING) {
return group_offsets.is_empty() ? range_window_ASC(input,
orderby_column,
preceding_value,
Expand Down Expand Up @@ -856,7 +860,7 @@ struct dispatch_grouped_range_rolling_window {
std::unique_ptr<column>>
operator()(column_view const& input,
column_view const& orderby_column,
cudf::order const& timestamp_ordering,
cudf::order const& order_of_orderby_column,
rmm::device_uvector<cudf::size_type> const& group_offsets,
rmm::device_uvector<cudf::size_type> const& group_labels,
range_window_bounds const& preceding_window,
Expand All @@ -868,7 +872,7 @@ struct dispatch_grouped_range_rolling_window {
{
return grouped_range_rolling_window_impl<OrderByColumnType>(input,
orderby_column,
timestamp_ordering,
order_of_orderby_column,
group_offsets,
group_labels,
preceding_window,
Expand Down
16 changes: 11 additions & 5 deletions cpp/src/rolling/range_window_bounds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,34 @@ namespace {
* This makes it possible to copy construct and copy assign `range_window_bounds` objects.
*/
struct range_scalar_constructor {
template <typename T, std::enable_if_t<!detail::is_supported_range_type<T>(), void>* = nullptr>
template <typename T, CUDF_ENABLE_IF(not detail::is_supported_range_type<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
CUDF_FAIL(
"Unsupported range type. "
"Only Durations and non-boolean integral range types are allowed.");
"Only Durations, fixed-point, and non-boolean integral range types are allowed.");
}

template <typename T, std::enable_if_t<cudf::is_duration<T>(), void>* = nullptr>
template <typename T, CUDF_ENABLE_IF(cudf::is_duration<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<duration_scalar<T>>(
static_cast<duration_scalar<T> const&>(range_scalar_));
}

template <typename T,
std::enable_if_t<std::is_integral_v<T> && !cudf::is_boolean<T>(), void>* = nullptr>
template <typename T, CUDF_ENABLE_IF(std::is_integral_v<T> && not cudf::is_boolean<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<numeric_scalar<T>>(
static_cast<numeric_scalar<T> const&>(range_scalar_));
}

template <typename T, CUDF_ENABLE_IF(cudf::is_fixed_point<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<fixed_point_scalar<T>>(
static_cast<fixed_point_scalar<T> const&>(range_scalar_));
}
};

} // namespace
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ ConfigureTest(
ROLLING_TEST
rolling/collect_ops_test.cpp
rolling/empty_input_test.cpp
rolling/grouped_rolling_range_test.cpp
rolling/grouped_rolling_test.cpp
rolling/lead_lag_test.cpp
rolling/nth_element_test.cpp
Expand Down
Loading

0 comments on commit 69cb31d

Please sign in to comment.