Skip to content

Commit

Permalink
Optimize compaction operations (#10030)
Browse files Browse the repository at this point in the history
Related to #9413.

This PR adds `unordered_drop_duplicates`/`unordered_distinct_count` APIs by using hash-based algorithms. It doesn't close the original issue since adding `std::unique`-like `drop_duplicates` is not addressed in this PR. It involves several changes:

- [x] Change the behavior of the existing `distinct_count`: counting the number of consecutive groups of equivalent rows instead of total unique.
- [x] Add hash-based `unordered_distinct_count`: this new API counts unique rows across the whole table by using a hash map. It requires a newer version of `cuco` with bug fixing: NVIDIA/cuCollections#132 and NVIDIA/cuCollections#138.
- [x] Add hash-based `unordered_drop_duplicates`: similar to `drop_duplicates`, but this API doesn't support `keep` option and the output is in an unspecified order.
- [x] Replace all the cpp-side `drop_duplicates`/`distinct_count` use cases with `unordered_` versions. 
- [x] Update and replace the existing compaction benchmark with `nvbench`.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - https://github.com/brandon-b-miller
  - Bradley Dice (https://github.com/bdice)
  - Nghia Truong (https://github.com/ttnghia)
  - Robert Maynard (https://github.com/robertmaynard)

URL: #10030
  • Loading branch information
PointKernel authored Feb 2, 2022
1 parent a080a4c commit b6bb463
Show file tree
Hide file tree
Showing 19 changed files with 1,170 additions and 488 deletions.
4 changes: 2 additions & 2 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
# Copyright (c) 2018-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -123,7 +123,7 @@ ConfigureBench(APPLY_BOOLEAN_MASK_BENCH stream_compaction/apply_boolean_mask.cpp

# ##################################################################################################
# * stream_compaction benchmark -------------------------------------------------------------------
ConfigureBench(STREAM_COMPACTION_BENCH stream_compaction/drop_duplicates.cpp)
ConfigureNVBench(STREAM_COMPACTION_BENCH stream_compaction/drop_duplicates.cpp)

# ##################################################################################################
# * join benchmark --------------------------------------------------------------------------------
Expand Down
118 changes: 78 additions & 40 deletions cpp/benchmarks/stream_compaction/drop_duplicates.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,64 +15,102 @@
*/

#include <cudf/column/column_view.hpp>
#include <cudf/stream_compaction.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/types.hpp>
#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <fixture/benchmark_fixture.hpp>
#include <synchronization/synchronization.hpp>

#include <fixture/rmm_pool_raii.hpp>

#include <nvbench/nvbench.cuh>

#include <memory>
#include <random>

class Compaction : public cudf::benchmark {
};
// necessary for custom enum types
// see: https://github.com/NVIDIA/nvbench/blob/main/examples/enums.cu
NVBENCH_DECLARE_ENUM_TYPE_STRINGS(
// Enum type:
cudf::duplicate_keep_option,
// Callable to generate input strings:
[](cudf::duplicate_keep_option option) {
switch (option) {
case cudf::duplicate_keep_option::KEEP_FIRST: return "KEEP_FIRST";
case cudf::duplicate_keep_option::KEEP_LAST: return "KEEP_LAST";
case cudf::duplicate_keep_option::KEEP_NONE: return "KEEP_NONE";
default: return "ERROR";
}
},
// Callable to generate descriptions:
[](auto) { return std::string{}; })

NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_ms, "cudf::timestamp_ms", "cudf::timestamp_ms");

template <typename Type, cudf::duplicate_keep_option Keep>
void nvbench_drop_duplicates(nvbench::state& state,
nvbench::type_list<Type, nvbench::enum_type<Keep>>)
{
if constexpr (not std::is_same_v<Type, int32_t> and
Keep != cudf::duplicate_keep_option::KEEP_FIRST) {
state.skip("Skip unwanted benchmarks.");
}

cudf::rmm_pool_raii pool_raii;

auto const num_rows = state.get_int64("NumRows");

cudf::test::UniformRandomGenerator<long> rand_gen(0, 100);
auto elements = cudf::detail::make_counting_transform_iterator(
0, [&rand_gen](auto row) { return rand_gen.generate(); });
auto valids = cudf::detail::make_counting_transform_iterator(
0, [](auto i) { return i % 100 == 0 ? false : true; });
cudf::test::fixed_width_column_wrapper<Type, long> values(elements, elements + num_rows, valids);

auto input_column = cudf::column_view(values);
auto input_table = cudf::table_view({input_column, input_column, input_column, input_column});

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = cudf::detail::drop_duplicates(
input_table, {0}, Keep, cudf::null_equality::EQUAL, cudf::null_order::BEFORE, stream_view);
});
}

template <typename Type>
void BM_compaction(benchmark::State& state, cudf::duplicate_keep_option keep)
void nvbench_unordered_drop_duplicates(nvbench::state& state, nvbench::type_list<Type>)
{
auto const n_rows = static_cast<cudf::size_type>(state.range(0));
cudf::rmm_pool_raii pool_raii;

auto const num_rows = state.get_int64("NumRows");

cudf::test::UniformRandomGenerator<long> rand_gen(0, 100);
auto elements = cudf::detail::make_counting_transform_iterator(
0, [&rand_gen](auto row) { return rand_gen.generate(); });
auto valids = cudf::detail::make_counting_transform_iterator(
0, [](auto i) { return i % 100 == 0 ? false : true; });
cudf::test::fixed_width_column_wrapper<Type, long> values(elements, elements + n_rows, valids);
cudf::test::fixed_width_column_wrapper<Type, long> values(elements, elements + num_rows, valids);

auto input_column = cudf::column_view(values);
auto input_table = cudf::table_view({input_column, input_column, input_column, input_column});

for (auto _ : state) {
cuda_event_timer timer(state, true);
auto result = cudf::drop_duplicates(input_table, {0}, keep);
}
state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = cudf::detail::unordered_drop_duplicates(
input_table, {0}, cudf::null_equality::EQUAL, stream_view);
});
}

#define concat(a, b, c) a##b##c
#define get_keep(op) cudf::duplicate_keep_option::KEEP_##op

// TYPE, OP
#define RBM_BENCHMARK_DEFINE(name, type, keep) \
BENCHMARK_DEFINE_F(Compaction, name)(::benchmark::State & state) \
{ \
BM_compaction<type>(state, get_keep(keep)); \
} \
BENCHMARK_REGISTER_F(Compaction, name) \
->UseManualTime() \
->Arg(10000) /* 10k */ \
->Arg(100000) /* 100k */ \
->Arg(1000000) /* 1M */ \
->Arg(10000000) /* 10M */

#define COMPACTION_BENCHMARK_DEFINE(type, keep) \
RBM_BENCHMARK_DEFINE(concat(type, _, keep), type, keep)

COMPACTION_BENCHMARK_DEFINE(bool, NONE);
COMPACTION_BENCHMARK_DEFINE(int8_t, NONE);
COMPACTION_BENCHMARK_DEFINE(int32_t, NONE);
COMPACTION_BENCHMARK_DEFINE(int32_t, FIRST);
COMPACTION_BENCHMARK_DEFINE(int32_t, LAST);
using cudf::timestamp_ms;
COMPACTION_BENCHMARK_DEFINE(timestamp_ms, NONE);
COMPACTION_BENCHMARK_DEFINE(float, NONE);
using data_type = nvbench::type_list<bool, int8_t, int32_t, int64_t, float, cudf::timestamp_ms>;
using keep_option = nvbench::enum_type_list<cudf::duplicate_keep_option::KEEP_FIRST,
cudf::duplicate_keep_option::KEEP_LAST,
cudf::duplicate_keep_option::KEEP_NONE>;

NVBENCH_BENCH_TYPES(nvbench_drop_duplicates, NVBENCH_TYPE_AXES(data_type, keep_option))
.set_name("drop_duplicates")
.set_type_axes_names({"Type", "KeepOption"})
.add_int64_axis("NumRows", {10'000, 100'000, 1'000'000, 10'000'000});

NVBENCH_BENCH_TYPES(nvbench_unordered_drop_duplicates, NVBENCH_TYPE_AXES(data_type))
.set_name("unordered_drop_duplicates")
.set_type_axes_names({"Type"})
.add_int64_axis("NumRows", {10'000, 100'000, 1'000'000, 10'000'000});
4 changes: 2 additions & 2 deletions cpp/cmake/thirdparty/get_cucollections.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# =============================================================================
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
Expand All @@ -21,7 +21,7 @@ function(find_and_configure_cucollections)
cuco 0.0
GLOBAL_TARGETS cuco::cuco
CPM_ARGS GITHUB_REPOSITORY NVIDIA/cuCollections
GIT_TAG 0ca860b824f5dc22cf8a41f09912e62e11f07d82
GIT_TAG 6ec8b6dcdeceea07ab4456d32461a05c18864411
OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF"
)

Expand Down
33 changes: 32 additions & 1 deletion cpp/include/cudf/detail/stream_compaction.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -75,6 +75,18 @@ std::unique_ptr<table> drop_duplicates(
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::unordered_drop_duplicates
*
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<table> unordered_drop_duplicates(
table_view const& input,
std::vector<size_type> const& keys,
null_equality nulls_equal = null_equality::EQUAL,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc cudf::distinct_count(column_view const&, null_policy, nan_policy)
*
Expand All @@ -94,5 +106,24 @@ cudf::size_type distinct_count(table_view const& input,
null_equality nulls_equal = null_equality::EQUAL,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @copydoc cudf::unordered_distinct_count(column_view const&, null_policy, nan_policy)
*
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
cudf::size_type unordered_distinct_count(column_view const& input,
null_policy null_handling,
nan_policy nan_handling,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

/**
* @copydoc cudf::unordered_distinct_count(table_view const&, null_equality)
*
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*/
cudf::size_type unordered_distinct_count(table_view const& input,
null_equality nulls_equal = null_equality::EQUAL,
rmm::cuda_stream_view stream = rmm::cuda_stream_default);

} // namespace detail
} // namespace cudf
101 changes: 81 additions & 20 deletions cpp/include/cudf/stream_compaction.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -189,7 +189,7 @@ std::unique_ptr<table> drop_nans(
* @note if @p input.num_rows() is zero, there is no error, and an empty table
* is returned.
*
* @throws cudf::logic_error if The `input` size and `boolean_mask` size mismatches.
* @throws cudf::logic_error if `input.num_rows() != boolean_mask.size()`.
* @throws cudf::logic_error if `boolean_mask` is not `type_id::BOOL8` type.
*
* @param[in] input The input table_view to filter
Expand All @@ -214,26 +214,29 @@ enum class duplicate_keep_option {
};

/**
* @brief Create a new table without duplicate rows
* @brief Create a new table without duplicate rows.
*
* The output table is sorted according to the lexicographic ordering of the data in the columns
* indexed by `keys`.
*
* Given an `input` table_view, each row is copied to output table if the corresponding
* row of `keys` columns is unique, where the definition of unique depends on the value of @p keep:
* - KEEP_FIRST: only the first of a sequence of duplicate rows is copied
* - KEEP_LAST: only the last of a sequence of duplicate rows is copied
* - KEEP_NONE: no duplicate rows are copied
*
* @throws cudf::logic_error if The `input` row size mismatches with `keys`.
* @throws cudf::logic_error if the `keys` column indices are out of bounds in the `input` table.
*
* @param[in] input input table_view to copy only unique rows
* @param[in] keys vector of indices representing key columns from `input`
* @param[in] keep keep first entry, last entry, or no entries if duplicates found
* @param[in] keep keep first row, last row, or no rows of the found duplicates
* @param[in] nulls_equal flag to denote nulls are equal if null_equality::EQUAL, nulls are not
* equal if null_equality::UNEQUAL
* @param[in] null_precedence flag to denote nulls should appear before or after non-null items
* @param[in] mr Device memory resource used to allocate the returned table's device
* memory
* memory
*
* @return Table with unique rows as per specified `keep`.
* @return Table with sorted unique rows as specified by `keep`.
*/
std::unique_ptr<table> drop_duplicates(
table_view const& input,
Expand All @@ -244,37 +247,95 @@ std::unique_ptr<table> drop_duplicates(
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Count the unique elements in the column_view
* @brief Create a new table without duplicate rows with hash-based algorithms.
*
* Given an `input` table_view, each row is copied to output table if the corresponding
* row of `keys` columns is unique. If duplicate rows are present, it is unspecified which
* row is copied.
*
* Given an input column_view, number of unique elements in this column_view is returned
* The order of elements in the output table is not specified.
*
* @param[in] input input table_view to copy only unique rows
* @param[in] keys vector of indices representing key columns from `input`
* @param[in] nulls_equal flag to denote nulls are equal if null_equality::EQUAL, nulls are not
* equal if null_equality::UNEQUAL
* @param[in] mr Device memory resource used to allocate the returned table's device
* memory
*
* @return Table with unique rows in an unspecified order.
*/
std::unique_ptr<table> unordered_drop_duplicates(
table_view const& input,
std::vector<size_type> const& keys,
null_equality nulls_equal = null_equality::EQUAL,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Count the number of consecutive groups of equivalent elements in a column.
*
* If `null_handling` is null_policy::EXCLUDE and `nan_handling` is nan_policy::NAN_IS_NULL, both
* `NaN` and `null` values are ignored. If `null_handling` is null_policy::EXCLUDE and
* `nan_handling` is nan_policy::NAN_IS_VALID, only `null` is ignored, `NaN` is considered in unique
* count.
* `nan_handling` is nan_policy::NAN_IS_VALID, only `null` is ignored, `NaN` is considered in count.
*
* `null`s are handled as equal.
*
* @param[in] input The column_view whose unique elements will be counted.
* @param[in] input The column_view whose number of distinct consecutive groups will be counted
* @param[in] null_handling flag to include or ignore `null` while counting
* @param[in] nan_handling flag to consider `NaN==null` or not.
* @param[in] nan_handling flag to consider `NaN==null` or not
*
* @return number of unique elements
* @return number of distinct consecutive groups in the column
*/
cudf::size_type distinct_count(column_view const& input,
null_policy null_handling,
nan_policy nan_handling);

/**
* @brief Count the unique rows in a table.
*
* @brief Count the number of consecutive groups of equivalent elements in a table.
*
* @param[in] input Table whose unique rows will be counted.
* @param[in] nulls_equal flag to denote if null elements should be considered equal
* nulls are not equal if null_equality::UNEQUAL
* @param[in] input Table whose number of distinct consecutive groups will be counted
* @param[in] nulls_equal flag to denote if null elements should be considered equal.
* nulls are not equal if null_equality::UNEQUAL.
*
* @return number of unique rows in the table
* @return number of distinct consecutive groups in the table
*/
cudf::size_type distinct_count(table_view const& input,
null_equality nulls_equal = null_equality::EQUAL);

/**
* @brief Count the unique elements in the column_view.
*
* If `nulls_equal == nulls_equal::UNEQUAL`, all `null`s are unique.
*
* Given an input column_view, number of unique elements in this column_view is returned.
*
* If `null_handling` is null_policy::EXCLUDE and `nan_handling` is nan_policy::NAN_IS_NULL, both
* `NaN` and `null` values are ignored. If `null_handling` is null_policy::EXCLUDE and
* `nan_handling` is nan_policy::NAN_IS_VALID, only `null` is ignored, `NaN` is considered in unique
* count.
*
* `null`s are handled as equal.
*
* @param[in] input The column_view whose unique elements will be counted
* @param[in] null_handling flag to include or ignore `null` while counting
* @param[in] nan_handling flag to consider `NaN==null` or not
*
* @return number of unique elements
*/
cudf::size_type unordered_distinct_count(column_view const& input,
null_policy null_handling,
nan_policy nan_handling);

/**
* @brief Count the unique rows in a table.
*
* @param[in] input Table whose unique rows will be counted
* @param[in] nulls_equal flag to denote if null elements should be considered equal.
* nulls are not equal if null_equality::UNEQUAL.
*
* @return number of unique rows in the table
*/
cudf::size_type unordered_distinct_count(table_view const& input,
null_equality nulls_equal = null_equality::EQUAL);

/** @} */
} // namespace cudf
Loading

0 comments on commit b6bb463

Please sign in to comment.