From a6853f4b3832b5338a4d0cd9d0b93c7bcd1ce884 Mon Sep 17 00:00:00 2001
From: Srinivas Yadav <43375352+srinivasyadav18@users.noreply.github.com>
Date: Tue, 8 Oct 2024 23:03:03 -0500
Subject: [PATCH] Refactor `histogram` reduction using
 `cuco::static_set::insert_and_find` (#16485)

Refactors `histogram` reduce and groupby aggregations using `cuco::static_set::insert_and_find`. Speed improvement results [here](https://github.com/rapidsai/cudf/pull/16485#issuecomment-2394855796) and [here](https://github.com/rapidsai/cudf/pull/16485#issuecomment-2394865692).

Authors:
  - Srinivas Yadav (https://github.com/srinivasyadav18)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)

URL: https://github.com/rapidsai/cudf/pull/16485
---
 cpp/benchmarks/CMakeLists.txt                 |  10 +-
 cpp/benchmarks/groupby/group_histogram.cpp    |  90 ++++++++++
 cpp/benchmarks/reduction/histogram.cpp        |  68 +++++++
 .../cudf/detail/hash_reduce_by_row.cuh        | 169 ------------------
 cpp/src/reductions/histogram.cu               | 164 +++++++----------
 5 files changed, 231 insertions(+), 270 deletions(-)
 create mode 100644 cpp/benchmarks/groupby/group_histogram.cpp
 create mode 100644 cpp/benchmarks/reduction/histogram.cpp
 delete mode 100644 cpp/include/cudf/detail/hash_reduce_by_row.cuh

diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt
index b8a53cd8bd9..b0f75b25975 100644
--- a/cpp/benchmarks/CMakeLists.txt
+++ b/cpp/benchmarks/CMakeLists.txt
@@ -245,6 +245,7 @@ ConfigureNVBench(
   REDUCTION_NVBENCH
   reduction/anyall.cpp
   reduction/dictionary.cpp
+  reduction/histogram.cpp
   reduction/minmax.cpp
   reduction/rank.cpp
   reduction/reduce.cpp
@@ -270,8 +271,13 @@ ConfigureBench(
 )
 
 ConfigureNVBench(
-  GROUPBY_NVBENCH groupby/group_max.cpp groupby/group_max_multithreaded.cpp
-  groupby/group_nunique.cpp groupby/group_rank.cpp groupby/group_struct_keys.cpp
+  GROUPBY_NVBENCH
+  groupby/group_histogram.cpp
+  groupby/group_max.cpp
+  groupby/group_max_multithreaded.cpp
+  groupby/group_nunique.cpp
+  groupby/group_rank.cpp
+  groupby/group_struct_keys.cpp
 )
 
 # ##################################################################################################
diff --git a/cpp/benchmarks/groupby/group_histogram.cpp b/cpp/benchmarks/groupby/group_histogram.cpp
new file mode 100644
index 00000000000..cd7f9f298af
--- /dev/null
+++ b/cpp/benchmarks/groupby/group_histogram.cpp
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <benchmarks/common/generate_input.hpp>
+#include <benchmarks/fixture/benchmark_fixture.hpp>
+
+#include <cudf/groupby.hpp>
+
+#include <nvbench/nvbench.cuh>
+
+template <typename Type>
+void groupby_histogram_helper(nvbench::state& state,
+                              cudf::size_type num_rows,
+                              cudf::size_type cardinality,
+                              double null_probability)
+{
+  auto const keys = [&] {
+    data_profile const profile =
+      data_profile_builder()
+        .cardinality(cardinality)
+        .no_validity()
+        .distribution(cudf::type_to_id<int32_t>(), distribution_id::UNIFORM, 0, num_rows);
+    return create_random_column(cudf::type_to_id<int32_t>(), row_count{num_rows}, profile);
+  }();
+
+  auto const values = [&] {
+    auto builder = data_profile_builder().cardinality(0).distribution(
+      cudf::type_to_id<Type>(), distribution_id::UNIFORM, 0, num_rows);
+    if (null_probability > 0) {
+      builder.null_probability(null_probability);
+    } else {
+      builder.no_validity();
+    }
+    return create_random_column(
+      cudf::type_to_id<Type>(), row_count{num_rows}, data_profile{builder});
+  }();
+
+  // Vector of 1 request
+  std::vector<cudf::groupby::aggregation_request> requests(1);
+  requests.back().values = values->view();
+  requests.back().aggregations.push_back(
+    cudf::make_histogram_aggregation<cudf::groupby_aggregation>());
+
+  auto const mem_stats_logger = cudf::memory_stats_logger();
+  state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
+  state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
+    auto gb_obj       = cudf::groupby::groupby(cudf::table_view({keys->view()}));
+    auto const result = gb_obj.aggregate(requests);
+  });
+
+  auto const elapsed_time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
+  state.add_element_count(static_cast<double>(num_rows) / elapsed_time, "rows/s");
+  state.add_buffer_size(
+    mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
+}
+
+template <typename Type>
+void bench_groupby_histogram(nvbench::state& state, nvbench::type_list<Type>)
+{
+  auto const cardinality      = static_cast<cudf::size_type>(state.get_int64("cardinality"));
+  auto const num_rows         = static_cast<cudf::size_type>(state.get_int64("num_rows"));
+  auto const null_probability = state.get_float64("null_probability");
+
+  if (cardinality > num_rows) {
+    state.skip("cardinality > num_rows");
+    return;
+  }
+
+  groupby_histogram_helper<Type>(state, num_rows, cardinality, null_probability);
+}
+
+NVBENCH_BENCH_TYPES(bench_groupby_histogram,
+                    NVBENCH_TYPE_AXES(nvbench::type_list<int32_t, int64_t, float, double>))
+  .set_name("groupby_histogram")
+  .add_float64_axis("null_probability", {0, 0.1, 0.9})
+  .add_int64_axis("cardinality", {100, 1'000, 10'000, 100'000, 1'000'000, 10'000'000})
+  .add_int64_axis("num_rows", {100, 1'000, 10'000, 100'000, 1'000'000, 10'000'000});
diff --git a/cpp/benchmarks/reduction/histogram.cpp b/cpp/benchmarks/reduction/histogram.cpp
new file mode 100644
index 00000000000..d0925de5c87
--- /dev/null
+++ b/cpp/benchmarks/reduction/histogram.cpp
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2022-2024, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "cudf/aggregation.hpp"
+#include "cudf/detail/aggregation/aggregation.hpp"
+
+#include <benchmarks/common/generate_input.hpp>
+#include <benchmarks/common/nvbench_utilities.hpp>
+#include <benchmarks/common/table_utilities.hpp>
+
+#include <cudf/column/column_view.hpp>
+#include <cudf/detail/aggregation/aggregation.hpp>
+#include <cudf/reduction.hpp>
+#include <cudf/reduction/detail/histogram.hpp>
+#include <cudf/types.hpp>
+
+#include <nvbench/nvbench.cuh>
+
+template <typename type>
+static void nvbench_reduction_histogram(nvbench::state& state, nvbench::type_list<type>)
+{
+  auto const dtype = cudf::type_to_id<type>();
+
+  auto const cardinality      = static_cast<cudf::size_type>(state.get_int64("cardinality"));
+  auto const num_rows         = static_cast<cudf::size_type>(state.get_int64("num_rows"));
+  auto const null_probability = state.get_float64("null_probability");
+
+  if (cardinality > num_rows) {
+    state.skip("cardinality > num_rows");
+    return;
+  }
+
+  data_profile const profile = data_profile_builder()
+                                 .null_probability(null_probability)
+                                 .cardinality(cardinality)
+                                 .distribution(dtype, distribution_id::UNIFORM, 0, num_rows);
+
+  auto const input = create_random_column(dtype, row_count{num_rows}, profile);
+  auto agg         = cudf::make_histogram_aggregation<cudf::reduce_aggregation>();
+  state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
+    rmm::cuda_stream_view stream_view{launch.get_stream()};
+    auto result = cudf::reduce(*input, *agg, input->type(), stream_view);
+  });
+
+  state.add_element_count(input->size());
+}
+
+using data_type = nvbench::type_list<int32_t, int64_t>;
+
+NVBENCH_BENCH_TYPES(nvbench_reduction_histogram, NVBENCH_TYPE_AXES(data_type))
+  .set_name("histogram")
+  .add_float64_axis("null_probability", {0.1})
+  .add_int64_axis("cardinality",
+                  {0, 100, 1'000, 10'000, 100'000, 1'000'000, 10'000'000, 50'000'000})
+  .add_int64_axis("num_rows", {10'000, 100'000, 1'000'000, 10'000'000, 100'000'000});
diff --git a/cpp/include/cudf/detail/hash_reduce_by_row.cuh b/cpp/include/cudf/detail/hash_reduce_by_row.cuh
deleted file mode 100644
index 7de79b31bc7..00000000000
--- a/cpp/include/cudf/detail/hash_reduce_by_row.cuh
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright (c) 2022-2024, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <cudf/detail/cuco_helpers.hpp>
-#include <cudf/hashing/detail/helper_functions.cuh>
-#include <cudf/table/experimental/row_operators.cuh>
-#include <cudf/types.hpp>
-#include <cudf/utilities/memory_resource.hpp>
-
-#include <rmm/cuda_stream_view.hpp>
-#include <rmm/device_uvector.hpp>
-#include <rmm/exec_policy.hpp>
-
-#include <cuco/static_map.cuh>
-#include <thrust/for_each.h>
-#include <thrust/iterator/counting_iterator.h>
-#include <thrust/uninitialized_fill.h>
-
-namespace cudf::detail {
-
-using hash_map_type = cuco::legacy::
-  static_map<size_type, size_type, cuda::thread_scope_device, cudf::detail::cuco_allocator<char>>;
-
-/**
- * @brief The base struct for customized reduction functor to perform reduce-by-key with keys are
- * rows that compared equal.
- *
- * TODO: We need to switch to use `static_reduction_map` when it is ready
- * (https://github.com/NVIDIA/cuCollections/pull/98).
- */
-template <typename MapView, typename KeyHasher, typename KeyEqual, typename OutputType>
-struct reduce_by_row_fn_base {
- protected:
-  MapView const d_map;
-  KeyHasher const d_hasher;
-  KeyEqual const d_equal;
-  OutputType* const d_output;
-
-  reduce_by_row_fn_base(MapView const& d_map,
-                        KeyHasher const& d_hasher,
-                        KeyEqual const& d_equal,
-                        OutputType* const d_output)
-    : d_map{d_map}, d_hasher{d_hasher}, d_equal{d_equal}, d_output{d_output}
-  {
-  }
-
-  /**
-   * @brief Return a pointer to the output array at the given index.
-   *
-   * @param idx The access index
-   * @return A pointer to the given index in the output array
-   */
-  __device__ OutputType* get_output_ptr(size_type const idx) const
-  {
-    auto const iter = d_map.find(idx, d_hasher, d_equal);
-
-    if (iter != d_map.end()) {
-      // Only one (undetermined) index value of the duplicate rows could be inserted into the map.
-      // As such, looking up for all indices of duplicate rows always returns the same value.
-      auto const inserted_idx = iter->second.load(cuda::std::memory_order_relaxed);
-
-      // All duplicate rows will have concurrent access to this same output slot.
-      return &d_output[inserted_idx];
-    } else {
-      // All input `idx` values have been inserted into the map before.
-      // Thus, searching for an `idx` key resulting in the `end()` iterator only happens if
-      // `d_equal(idx, idx) == false`.
-      // Such situations are due to comparing nulls or NaNs which are considered as always unequal.
-      // In those cases, all rows containing nulls or NaNs are distinct. Just return their direct
-      // output slot.
-      return &d_output[idx];
-    }
-  }
-};
-
-/**
- * @brief Perform a reduction on groups of rows that are compared equal.
- *
- * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared
- * equal. A hash table is used to find groups of equal rows.
- *
- * At the beginning of the operation, the entire output array is filled with a value given by
- * the `init` parameter. Then, the reduction result for each row group is written into the output
- * array at the index of an unspecified row in the group.
- *
- * @tparam ReduceFuncBuilder The builder class that must have a `build()` method returning a
- *         reduction functor derived from `reduce_by_row_fn_base`
- * @tparam OutputType Type of the reduction results
- * @param map The auxiliary map to perform reduction
- * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row
- *        comparisons
- * @param num_rows The number of all input rows
- * @param has_nulls Indicate whether the input rows has any nulls at any nested levels
- * @param has_nested_columns Indicates whether the input table has any nested columns
- * @param nulls_equal Flag to specify whether null elements should be considered as equal
- * @param nans_equal Flag to specify whether NaN values in floating point column should be
- *        considered equal.
- * @param init The initial value for reduction of each row group
- * @param stream CUDA stream used for device memory operations and kernel launches
- * @param mr Device memory resource used to allocate the returned vector
- * @return A device_uvector containing the reduction results
- */
-template <typename ReduceFuncBuilder, typename OutputType>
-rmm::device_uvector<OutputType> hash_reduce_by_row(
-  hash_map_type const& map,
-  std::shared_ptr<cudf::experimental::row::equality::preprocessed_table> const preprocessed_input,
-  size_type num_rows,
-  cudf::nullate::DYNAMIC has_nulls,
-  bool has_nested_columns,
-  null_equality nulls_equal,
-  nan_equality nans_equal,
-  ReduceFuncBuilder func_builder,
-  OutputType init,
-  rmm::cuda_stream_view stream,
-  rmm::device_async_resource_ref mr)
-{
-  auto const map_dview  = map.get_device_view();
-  auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
-  auto const key_hasher = row_hasher.device_hasher(has_nulls);
-  auto const row_comp   = cudf::experimental::row::equality::self_comparator(preprocessed_input);
-
-  auto reduction_results = rmm::device_uvector<OutputType>(num_rows, stream, mr);
-  thrust::uninitialized_fill(
-    rmm::exec_policy(stream), reduction_results.begin(), reduction_results.end(), init);
-
-  auto const reduce_by_row = [&](auto const value_comp) {
-    if (has_nested_columns) {
-      auto const key_equal = row_comp.equal_to<true>(has_nulls, nulls_equal, value_comp);
-      thrust::for_each(
-        rmm::exec_policy(stream),
-        thrust::make_counting_iterator(0),
-        thrust::make_counting_iterator(num_rows),
-        func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin()));
-    } else {
-      auto const key_equal = row_comp.equal_to<false>(has_nulls, nulls_equal, value_comp);
-      thrust::for_each(
-        rmm::exec_policy(stream),
-        thrust::make_counting_iterator(0),
-        thrust::make_counting_iterator(num_rows),
-        func_builder.build(map_dview, key_hasher, key_equal, reduction_results.begin()));
-    }
-  };
-
-  if (nans_equal == nan_equality::ALL_EQUAL) {
-    using nan_equal_comparator =
-      cudf::experimental::row::equality::nan_equal_physical_equality_comparator;
-    reduce_by_row(nan_equal_comparator{});
-  } else {
-    using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator;
-    reduce_by_row(nan_unequal_comparator{});
-  }
-
-  return reduction_results;
-}
-
-}  // namespace cudf::detail
diff --git a/cpp/src/reductions/histogram.cu b/cpp/src/reductions/histogram.cu
index 362b5f74c46..b40b2b6dd2e 100644
--- a/cpp/src/reductions/histogram.cu
+++ b/cpp/src/reductions/histogram.cu
@@ -15,18 +15,24 @@
  */
 
 #include <cudf/column/column_factories.hpp>
+#include <cudf/detail/cuco_helpers.hpp>
 #include <cudf/detail/gather.hpp>
-#include <cudf/detail/hash_reduce_by_row.cuh>
 #include <cudf/detail/iterator.cuh>
 #include <cudf/scalar/scalar.hpp>
 #include <cudf/structs/structs_column_view.hpp>
+#include <cudf/table/experimental/row_operators.cuh>
 #include <cudf/utilities/memory_resource.hpp>
 
+#include <rmm/exec_policy.hpp>
+
+#include <cuco/operator.hpp>
+#include <cuco/static_set.cuh>
 #include <cuda/atomic>
 #include <cuda/functional>
 #include <thrust/copy.h>
 #include <thrust/iterator/zip_iterator.h>
 #include <thrust/tuple.h>
+#include <thrust/uninitialized_fill.h>
 
 #include <optional>
 
@@ -34,61 +40,12 @@ namespace cudf::reduction::detail {
 
 namespace {
 
+// A CUDA Cooperative Group of 1 thread for the hash set for histogram
+auto constexpr DEFAULT_HISTOGRAM_CG_SIZE = 1;
+
 // Always use 64-bit signed integer for storing count.
 using histogram_count_type = int64_t;
 
-/**
- * @brief The functor to accumulate the frequency of each distinct rows in the input table.
- */
-template <typename MapView, typename KeyHasher, typename KeyEqual, typename CountType>
-struct reduce_fn : cudf::detail::reduce_by_row_fn_base<MapView, KeyHasher, KeyEqual, CountType> {
-  CountType const* d_partial_output;
-
-  reduce_fn(MapView const& d_map,
-            KeyHasher const& d_hasher,
-            KeyEqual const& d_equal,
-            CountType* const d_output,
-            CountType const* const d_partial_output)
-    : cudf::detail::reduce_by_row_fn_base<MapView, KeyHasher, KeyEqual, CountType>{d_map,
-                                                                                   d_hasher,
-                                                                                   d_equal,
-                                                                                   d_output},
-      d_partial_output{d_partial_output}
-  {
-  }
-
-  // Count the number of rows in each group of rows that are compared equal.
-  __device__ void operator()(size_type const idx) const
-  {
-    auto const increment = d_partial_output ? d_partial_output[idx] : CountType{1};
-    auto const count =
-      cuda::atomic_ref<CountType, cuda::thread_scope_device>(*this->get_output_ptr(idx));
-    count.fetch_add(increment, cuda::std::memory_order_relaxed);
-  }
-};
-
-/**
- * @brief The builder to construct an instance of `reduce_fn` functor.
- */
-template <typename CountType>
-struct reduce_func_builder {
-  CountType const* const d_partial_output;
-
-  reduce_func_builder(CountType const* const d_partial_output) : d_partial_output{d_partial_output}
-  {
-  }
-
-  template <typename MapView, typename KeyHasher, typename KeyEqual>
-  auto build(MapView const& d_map,
-             KeyHasher const& d_hasher,
-             KeyEqual const& d_equal,
-             CountType* const d_output)
-  {
-    return reduce_fn<MapView, KeyHasher, KeyEqual, CountType>{
-      d_map, d_hasher, d_equal, d_output, d_partial_output};
-  }
-};
-
 /**
  * @brief Specialized functor to check for not-zero of the second component of the input.
  */
@@ -163,14 +120,6 @@ compute_row_frequencies(table_view const& input,
                "Nested types are not yet supported in histogram aggregation.",
                std::invalid_argument);
 
-  auto map = cudf::detail::hash_map_type{
-    compute_hash_table_size(input.num_rows()),
-    cuco::empty_key{-1},
-    cuco::empty_value{std::numeric_limits<size_type>::min()},
-
-    cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
-    stream.value()};
-
   auto const preprocessed_input =
     cudf::experimental::row::hash::preprocessed_table::create(input, stream);
   auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};
@@ -179,51 +128,68 @@ compute_row_frequencies(table_view const& input,
   auto const key_hasher = row_hasher.device_hasher(has_nulls);
   auto const row_comp   = cudf::experimental::row::equality::self_comparator(preprocessed_input);
 
-  auto const pair_iter = cudf::detail::make_counting_transform_iterator(
-    size_type{0},
-    cuda::proclaim_return_type<cuco::pair<size_type, size_type>>(
-      [] __device__(size_type const i) { return cuco::make_pair(i, i); }));
-
   // Always compare NaNs as equal.
   using nan_equal_comparator =
     cudf::experimental::row::equality::nan_equal_physical_equality_comparator;
   auto const value_comp = nan_equal_comparator{};
+  // Hard set the tparam `has_nested_columns` = false for now as we don't yet support nested columns
+  auto const key_equal = row_comp.equal_to<false>(has_nulls, null_equality::EQUAL, value_comp);
+
+  using row_hash =
+    cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
+                                                     cudf::nullate::DYNAMIC>;
+
+  size_t const num_rows = input.num_rows();
+
+  // Construct a vector to store reduced counts and init to zero
+  rmm::device_uvector<histogram_count_type> reduction_results(num_rows, stream, mr);
+  thrust::uninitialized_fill(rmm::exec_policy_nosync(stream),
+                             reduction_results.begin(),
+                             reduction_results.end(),
+                             histogram_count_type{0});
+
+  // Construct a hash set
+  auto row_set = cuco::static_set{
+    cuco::extent{num_rows},
+    cudf::detail::CUCO_DESIRED_LOAD_FACTOR,
+    cuco::empty_key<size_type>{-1},
+    key_equal,
+    cuco::linear_probing<DEFAULT_HISTOGRAM_CG_SIZE, row_hash>{key_hasher},
+    {},  // thread scope
+    {},  // storage
+    cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
+    stream.value()};
 
-  if (has_nested_columns) {
-    auto const key_equal = row_comp.equal_to<true>(has_nulls, null_equality::EQUAL, value_comp);
-    map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());
-  } else {
-    auto const key_equal = row_comp.equal_to<false>(has_nulls, null_equality::EQUAL, value_comp);
-    map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());
-  }
-
-  // Gather the indices of distinct rows.
-  auto distinct_indices = std::make_unique<rmm::device_uvector<size_type>>(
-    static_cast<size_type>(map.get_size()), stream, mr);
-
-  // Store the number of occurrences of each distinct row.
-  auto distinct_counts = make_numeric_column(data_type{type_to_id<histogram_count_type>()},
-                                             static_cast<size_type>(map.get_size()),
-                                             mask_state::UNALLOCATED,
-                                             stream,
-                                             mr);
+  // Device-accessible reference to the hash set with `insert_and_find` operator
+  auto row_set_ref = row_set.ref(cuco::op::insert_and_find);
 
   // Compute frequencies (aka distinct counts) for the input rows.
   // Note that we consider null and NaNs as always equal.
-  auto const reduction_results = cudf::detail::hash_reduce_by_row(
-    map,
-    preprocessed_input,
-    input.num_rows(),
-    has_nulls,
-    has_nested_columns,
-    null_equality::EQUAL,
-    nan_equality::ALL_EQUAL,
-    reduce_func_builder<histogram_count_type>{
-      partial_counts ? partial_counts.value().begin<histogram_count_type>() : nullptr},
-    histogram_count_type{0},
-    stream,
-    cudf::get_current_device_resource_ref());
-
+  thrust::for_each(
+    rmm::exec_policy_nosync(stream),
+    thrust::make_counting_iterator<size_t>(0),
+    thrust::make_counting_iterator<size_t>(num_rows),
+    [set_ref = row_set_ref,
+     increments =
+       partial_counts.has_value() ? partial_counts.value().begin<histogram_count_type>() : nullptr,
+     counts = reduction_results.begin()] __device__(auto const idx) mutable {
+      auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx);
+      cuda::atomic_ref<histogram_count_type, cuda::thread_scope_device> count_ref{
+        counts[*inserted_idx_ptr]};
+      auto const increment = increments ? increments[idx] : histogram_count_type{1};
+      count_ref.fetch_add(increment, cuda::std::memory_order_relaxed);
+    });
+
+  // Set-size is the number of distinct (inserted) rows
+  auto const set_size = row_set.size(stream);
+
+  // Vector of distinct indices
+  auto distinct_indices = std::make_unique<rmm::device_uvector<size_type>>(set_size, stream, mr);
+  // Column of distinct counts
+  auto distinct_counts = make_numeric_column(
+    data_type{type_to_id<histogram_count_type>()}, set_size, mask_state::UNALLOCATED, stream, mr);
+
+  // Copy row indices and counts to the output if counts are non-zero
   auto const input_it = thrust::make_zip_iterator(
     thrust::make_tuple(thrust::make_counting_iterator(0), reduction_results.begin()));
   auto const output_it = thrust::make_zip_iterator(thrust::make_tuple(
@@ -232,7 +198,7 @@ compute_row_frequencies(table_view const& input,
   // Reduction results above are either group sizes of equal rows, or `0`.
   // The final output is non-zero group sizes only.
   thrust::copy_if(
-    rmm::exec_policy(stream), input_it, input_it + input.num_rows(), output_it, is_not_zero{});
+    rmm::exec_policy_nosync(stream), input_it, input_it + num_rows, output_it, is_not_zero{});
 
   return {std::move(distinct_indices), std::move(distinct_counts)};
 }