Skip to content

Commit

Permalink
Add dictionary column support to rolling_window (#7186)
Browse files Browse the repository at this point in the history
Reference #5963 

Add support for dictionary column to `cudf::rolling_window` (non-udf)

Rolling aggregations
- [x] min/max
- [x] lead/lag
- [x] counting, row-number

These only require aggregating the dictionary indices and do not need to access the keys.

Authors:
  - David (@davidwendt)

Approvers:
  - Mark Harris (@harrism)
  - Ram (Ramakrishna Prabhu) (@rgsl888prabhu)

URL: #7186
  • Loading branch information
davidwendt authored Jan 28, 2021
1 parent 9672e3d commit b608832
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 21 deletions.
10 changes: 7 additions & 3 deletions cpp/src/rolling/rolling.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,8 +26,10 @@ std::unique_ptr<column> rolling_window(column_view const& input,
std::unique_ptr<aggregation> const& agg,
rmm::mr::device_memory_resource* mr)
{
auto defaults =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
return rolling_window(
input, empty_like(input)->view(), preceding_window, following_window, min_periods, agg, mr);
input, empty_like(defaults)->view(), preceding_window, following_window, min_periods, agg, mr);
}

namespace detail {
Expand Down Expand Up @@ -107,8 +109,10 @@ std::unique_ptr<column> rolling_window(column_view const& input,
stream,
mr);
} else {
auto defaults_col =
cudf::is_dictionary(input.type()) ? dictionary_column_view(input).indices() : input;
return cudf::detail::rolling_window(input,
empty_like(input)->view(),
empty_like(defaults_col)->view(),
preceding_window.begin<size_type>(),
following_window.begin<size_type>(),
min_periods,
Expand Down
64 changes: 47 additions & 17 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/dictionary/dictionary_factories.hpp>
#include <cudf/rolling.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
Expand Down Expand Up @@ -623,8 +625,6 @@ struct rolling_window_launcher {
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.is_empty()) return empty_like(input);

auto output = make_fixed_width_column(
target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr);

Expand Down Expand Up @@ -663,8 +663,6 @@ struct rolling_window_launcher {
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.is_empty()) return empty_like(input);

auto output = make_numeric_column(cudf::data_type{cudf::type_to_id<size_type>()},
input.size(),
cudf::mask_state::UNINITIALIZED,
Expand Down Expand Up @@ -755,8 +753,6 @@ struct rolling_window_launcher {
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.is_empty()) return empty_like(input);

CUDF_EXPECTS(default_outputs.type().id() == input.type().id(),
"Defaults column type must match input column."); // Because LEAD/LAG.

Expand Down Expand Up @@ -1036,18 +1032,52 @@ std::unique_ptr<column> rolling_window(column_view const& input,
static_assert(warp_size == cudf::detail::size_in_bits<cudf::bitmask_type>(),
"bitmask_type size does not match CUDA warp size");

if (input.is_empty()) return empty_like(input);

if (cudf::is_dictionary(input.type()))
CUDF_EXPECTS(agg->kind == aggregation::COUNT_ALL || agg->kind == aggregation::COUNT_VALID ||
agg->kind == aggregation::ROW_NUMBER || agg->kind == aggregation::MIN ||
agg->kind == aggregation::MAX || agg->kind == aggregation::LEAD ||
agg->kind == aggregation::LAG,
"Invalid aggregation for dictionary column");

min_periods = std::max(min_periods, 0);

return cudf::type_dispatcher(input.type(),
dispatch_rolling{},
input,
default_outputs,
preceding_window_begin,
following_window_begin,
min_periods,
agg,
stream,
mr);
auto input_col = cudf::is_dictionary(input.type())
? dictionary_column_view(input).get_indices_annotated()
: input;
auto output = cudf::type_dispatcher(input_col.type(),
dispatch_rolling{},
input_col,
default_outputs,
preceding_window_begin,
following_window_begin,
min_periods,
agg,
stream,
mr);
if (!cudf::is_dictionary(input.type())) return output;

// dictionary column post processing
if (agg->kind == aggregation::COUNT_ALL || agg->kind == aggregation::COUNT_VALID ||
agg->kind == aggregation::ROW_NUMBER)
return output;

// output is new dictionary indices (including nulls)
auto keys = std::make_unique<column>(dictionary_column_view(input).keys(), stream, mr);
auto const indices_type = output->type(); // capture these
auto const output_size = output->size(); // before calling
auto const null_count = output->null_count(); // release()
auto contents = output->release();
// create indices column from output column data
auto indices = std::make_unique<column>(indices_type,
output_size,
std::move(*(contents.data.release())),
rmm::device_buffer{0, stream, mr},
0);
// create dictionary from keys and indices
return make_dictionary_column(
std::move(keys), std::move(indices), std::move(*(contents.null_mask.release())), null_count);
}

} // namespace detail
Expand Down
70 changes: 69 additions & 1 deletion cpp/tests/rolling/rolling_test.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@

#include <cudf/aggregation.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/dictionary/encode.hpp>
#include <cudf/rolling.hpp>
#include <cudf/utilities/bit.hpp>
#include <src/rolling/rolling_detail.hpp>
Expand Down Expand Up @@ -1048,4 +1049,71 @@ TYPED_TEST(FixedPointTests, MinMaxCountLagLeadNulls)
cudf::logic_error);
}

class RollingDictionaryTest : public cudf::test::BaseFixture {
};

TEST_F(RollingDictionaryTest, Count)
{
cudf::test::dictionary_column_wrapper<std::string> input(
{"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"},
{1, 0, 0, 1, 0, 1, 1, 1, 0});
fixed_width_column_wrapper<size_type> expected_count_val({1, 2, 1, 2, 3, 3, 3, 2, 1},
{1, 1, 1, 1, 1, 1, 1, 1, 1});
fixed_width_column_wrapper<size_type> expected_count_all({3, 4, 4, 4, 4, 4, 4, 3, 2},
{1, 1, 1, 1, 1, 1, 1, 1, 1});
fixed_width_column_wrapper<size_type> expected_row_number({1, 2, 2, 2, 2, 2, 2, 2, 2},
{1, 1, 1, 1, 1, 1, 1, 1, 1});

auto got_count_valid = cudf::rolling_window(input, 2, 2, 1, cudf::make_count_aggregation());
auto got_count_all =
cudf::rolling_window(input, 2, 2, 1, cudf::make_count_aggregation(cudf::null_policy::INCLUDE));
auto got_row_number = cudf::rolling_window(input, 2, 2, 1, cudf::make_row_number_aggregation());

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_val, got_count_valid->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, got_count_all->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_row_number, got_row_number->view());
}

TEST_F(RollingDictionaryTest, MinMax)
{
cudf::test::dictionary_column_wrapper<std::string> input(
{"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"},
{1, 0, 0, 1, 0, 1, 1, 1, 0});
cudf::test::strings_column_wrapper expected_min(
{"This", "This", "test", "operated", "on", "on", "on", "on", "string"},
{1, 1, 1, 1, 1, 1, 1, 1, 1});
cudf::test::strings_column_wrapper expected_max(
{"This", "test", "test", "test", "test", "string", "string", "string", "string"},
{1, 1, 1, 1, 1, 1, 1, 1, 1});

auto got_min_dict = cudf::rolling_window(input, 2, 2, 1, cudf::make_min_aggregation());
auto got_min = cudf::dictionary::decode(cudf::dictionary_column_view(got_min_dict->view()));

auto got_max_dict = cudf::rolling_window(input, 2, 2, 1, cudf::make_max_aggregation());
auto got_max = cudf::dictionary::decode(cudf::dictionary_column_view(got_max_dict->view()));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, got_min->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, got_max->view());
}

TEST_F(RollingDictionaryTest, LeadLag)
{
cudf::test::dictionary_column_wrapper<std::string> input(
{"This", "is", "rolling", "test", "being", "operated", "on", "string", "column"},
{1, 0, 0, 1, 0, 1, 1, 1, 0});
cudf::test::strings_column_wrapper expected_lead(
{"", "", "test", "", "operated", "on", "string", "", ""}, {0, 0, 1, 0, 1, 1, 1, 0, 0});
cudf::test::strings_column_wrapper expected_lag(
{"", "This", "", "", "test", "", "operated", "on", "string"}, {0, 1, 0, 0, 1, 0, 1, 1, 1});

auto got_lead_dict = cudf::rolling_window(input, 2, 1, 1, cudf::make_lead_aggregation(1));
auto got_lead = cudf::dictionary::decode(cudf::dictionary_column_view(got_lead_dict->view()));

auto got_lag_dict = cudf::rolling_window(input, 2, 2, 1, cudf::make_lag_aggregation(1));
auto got_lag = cudf::dictionary::decode(cudf::dictionary_column_view(got_lag_dict->view()));

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lead, got_lead->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lag, got_lag->view());
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit b608832

Please sign in to comment.