From 990eaa3678dc5576f83b6900f1e86a086bf6c9c4 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 9 Oct 2024 12:44:05 -0700 Subject: [PATCH 1/9] Add shared memory aggregator --- .../groupby/hash/shared_memory_aggregator.cuh | 251 ++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 cpp/src/groupby/hash/shared_memory_aggregator.cuh diff --git a/cpp/src/groupby/hash/shared_memory_aggregator.cuh b/cpp/src/groupby/hash/shared_memory_aggregator.cuh new file mode 100644 index 00000000000..14be2cb6183 --- /dev/null +++ b/cpp/src/groupby/hash/shared_memory_aggregator.cuh @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace cudf::groupby::detail::hash { +template +struct update_target_element_shmem { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const + { + CUDF_UNREACHABLE("Invalid source type and aggregation combination."); + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::MIN, + cuda::std::enable_if_t() && cudf::has_atomic_support()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using DeviceTarget = cudf::detail::underlying_target_t; + using DeviceSource = cudf::detail::underlying_source_t; + + DeviceTarget* target_casted = reinterpret_cast(target); + cudf::detail::atomic_min(&target_casted[target_index], + static_cast(source.element(source_index))); + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::MAX, + cuda::std::enable_if_t() && cudf::has_atomic_support()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using DeviceTarget = cudf::detail::underlying_target_t; + using DeviceSource = cudf::detail::underlying_source_t; + + DeviceTarget* target_casted = reinterpret_cast(target); + cudf::detail::atomic_max(&target_casted[target_index], + static_cast(source.element(source_index))); + + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::SUM, + cuda::std::enable_if_t() && cudf::has_atomic_support() && + !cudf::is_timestamp()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using DeviceTarget = cudf::detail::underlying_target_t; + using DeviceSource = cudf::detail::underlying_source_t; + + DeviceTarget* target_casted = reinterpret_cast(target); + cudf::detail::atomic_add(&target_casted[target_index], + static_cast(source.element(source_index))); + + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::SUM_OF_SQUARES, + cuda::std::enable_if_t()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* target_casted = reinterpret_cast(target); + auto value = static_cast(source.element(source_index)); + cudf::detail::atomic_add(&target_casted[target_index], value * value); + + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::PRODUCT, + cuda::std::enable_if_t()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* target_casted = reinterpret_cast(target); + cudf::detail::atomic_mul(&target_casted[target_index], + static_cast(source.element(source_index))); + + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::COUNT_VALID, + cuda::std::enable_if_t< + cudf::detail::is_valid_aggregation()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* target_casted = reinterpret_cast(target); + cudf::detail::atomic_add(&target_casted[target_index], Target{1}); + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::COUNT_ALL, + cuda::std::enable_if_t< + cudf::detail::is_valid_aggregation()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* target_casted = reinterpret_cast(target); + cudf::detail::atomic_add(&target_casted[target_index], Target{1}); + + // Assumes target is already set to be valid + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::ARGMAX, + cuda::std::enable_if_t() and + cudf::is_relationally_comparable()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* target_casted = reinterpret_cast(target); + auto old = cudf::detail::atomic_cas( + &target_casted[target_index], cudf::detail::ARGMAX_SENTINEL, source_index); + if (old != cudf::detail::ARGMAX_SENTINEL) { + while (source.element(source_index) > source.element(old)) { + old = cudf::detail::atomic_cas(&target_casted[target_index], old, source_index); + } + } + + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +template +struct update_target_element_shmem< + Source, + cudf::aggregation::ARGMIN, + cuda::std::enable_if_t() and + cudf::is_relationally_comparable()>> { + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* target_casted = reinterpret_cast(target); + auto old = cudf::detail::atomic_cas( + &target_casted[target_index], cudf::detail::ARGMIN_SENTINEL, source_index); + if (old != cudf::detail::ARGMIN_SENTINEL) { + while (source.element(source_index) < source.element(old)) { + old = cudf::detail::atomic_cas(&target_casted[target_index], old, source_index); + } + } + + if (target_null[target_index]) { target_null[target_index] = false; } + } +}; + +struct shmem_element_aggregator { + template + __device__ void operator()(std::byte* target, + cudf::size_type target_index, + bool* target_null, + cudf::column_device_view source, + cudf::size_type source_index) const noexcept + { + if constexpr (k != cudf::aggregation::COUNT_ALL) { + if (source.is_null(source_index)) { return; } + } + update_target_element_shmem{}( + target, target_index, target_null, source, source_index); + } +}; +} // namespace cudf::groupby::detail::hash From ae9589ef24b7c5d5e013efeffe9494ecd92f3720 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 9 Oct 2024 12:44:20 -0700 Subject: [PATCH 2/9] Add global memory aggregator --- .../groupby/hash/global_memory_aggregator.cuh | 275 ++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 cpp/src/groupby/hash/global_memory_aggregator.cuh diff --git a/cpp/src/groupby/hash/global_memory_aggregator.cuh b/cpp/src/groupby/hash/global_memory_aggregator.cuh new file mode 100644 index 00000000000..ac0e5d7d8c2 --- /dev/null +++ b/cpp/src/groupby/hash/global_memory_aggregator.cuh @@ -0,0 +1,275 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace cudf::groupby::detail::hash { +template +struct update_target_element_gmem { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const + { + CUDF_UNREACHABLE("Invalid source type and aggregation combination."); + } +}; + +template +struct update_target_element_gmem< + Source, + cudf::aggregation::MIN, + cuda::std::enable_if_t() && cudf::has_atomic_support()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using DeviceType = cudf::detail::underlying_target_t; + DeviceType* source_casted = reinterpret_cast(source); + cudf::detail::atomic_min(&target.element(target_index), + static_cast(source_casted[source_index])); + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; + +template +struct update_target_element_gmem< + Source, + cudf::aggregation::MAX, + cuda::std::enable_if_t() && cudf::has_atomic_support()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using DeviceType = cudf::detail::underlying_target_t; + DeviceType* source_casted = reinterpret_cast(source); + cudf::detail::atomic_max(&target.element(target_index), + static_cast(source_casted[source_index])); + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; + +template +struct update_target_element_gmem< + Source, + cudf::aggregation::SUM, + cuda::std::enable_if_t() && cudf::has_atomic_support() && + !cudf::is_timestamp()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using DeviceType = cudf::detail::underlying_target_t; + DeviceType* source_casted = reinterpret_cast(source); + cudf::detail::atomic_add(&target.element(target_index), + static_cast(source_casted[source_index])); + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; + +// The shared memory will already have it squared +template +struct update_target_element_gmem< + Source, + cudf::aggregation::SUM_OF_SQUARES, + cuda::std::enable_if_t()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using Target = cudf::detail::target_type_t; + + Target* source_casted = reinterpret_cast(source); + Target value = static_cast(source_casted[source_index]); + + cudf::detail::atomic_add(&target.element(target_index), value); + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; + +template +struct update_target_element_gmem< + Source, + cudf::aggregation::PRODUCT, + cuda::std::enable_if_t()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using Target = cudf::detail::target_type_t; + + Target* source_casted = reinterpret_cast(source); + cudf::detail::atomic_mul(&target.element(target_index), + static_cast(source_casted[source_index])); + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; + +// Assuming that the target column of COUNT_VALID, COUNT_ALL would be using fixed_width column and +// non-fixed point column +template +struct update_target_element_gmem< + Source, + cudf::aggregation::COUNT_VALID, + cuda::std::enable_if_t< + cudf::detail::is_valid_aggregation()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using Target = cudf::detail::target_type_t; + + Target* source_casted = reinterpret_cast(source); + cudf::detail::atomic_add(&target.element(target_index), + static_cast(source_casted[source_index])); + + // It is assumed the output for COUNT_VALID is initialized to be all valid + } +}; + +// TODO: VALID and ALL have same code +template +struct update_target_element_gmem< + Source, + cudf::aggregation::COUNT_ALL, + cuda::std::enable_if_t< + cudf::detail::is_valid_aggregation()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using Target = cudf::detail::target_type_t; + + Target* source_casted = reinterpret_cast(source); + cudf::detail::atomic_add(&target.element(target_index), + static_cast(source_casted[source_index])); + + // It is assumed the output for COUNT_ALL is initialized to be all valid + } +}; + +template +struct update_target_element_gmem< + Source, + cudf::aggregation::ARGMAX, + cuda::std::enable_if_t() and + cudf::is_relationally_comparable()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* source_casted = reinterpret_cast(source); + auto source_argmax_index = source_casted[source_index]; + auto old = cudf::detail::atomic_cas( + &target.element(target_index), cudf::detail::ARGMAX_SENTINEL, source_argmax_index); + if (old != cudf::detail::ARGMAX_SENTINEL) { + while (source_column.element(source_argmax_index) > + source_column.element(old)) { + old = + cudf::detail::atomic_cas(&target.element(target_index), old, source_argmax_index); + } + } + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; +template +struct update_target_element_gmem< + Source, + cudf::aggregation::ARGMIN, + cuda::std::enable_if_t() and + cudf::is_relationally_comparable()>> { + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + using Target = cudf::detail::target_type_t; + Target* source_casted = reinterpret_cast(source); + auto source_argmin_index = source_casted[source_index]; + auto old = cudf::detail::atomic_cas( + &target.element(target_index), cudf::detail::ARGMIN_SENTINEL, source_argmin_index); + if (old != cudf::detail::ARGMIN_SENTINEL) { + while (source_column.element(source_argmin_index) < + source_column.element(old)) { + old = + cudf::detail::atomic_cas(&target.element(target_index), old, source_argmin_index); + } + } + + if (target.is_null(target_index)) { target.set_valid(target_index); } + } +}; + +struct gmem_element_aggregator { + template + __device__ void operator()(cudf::mutable_column_device_view target, + cudf::size_type target_index, + cudf::column_device_view source_column, + std::byte* source, + cudf::size_type source_index, + bool* source_null) const noexcept + { + if constexpr (k != cudf::aggregation::COUNT_ALL) { + if (source_null[source_index]) { return; } + } + update_target_element_gmem{}( + target, target_index, source_column, source, source_index, source_null); + } +}; +} // namespace cudf::groupby::detail::hash From 4fd00bf83808ff111e4edaefe9a4b1455016c55d Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 9 Oct 2024 12:48:05 -0700 Subject: [PATCH 3/9] Clean up early exit logic for nulls --- .../detail/aggregation/device_aggregators.cuh | 28 ++----------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/cpp/include/cudf/detail/aggregation/device_aggregators.cuh b/cpp/include/cudf/detail/aggregation/device_aggregators.cuh index 10be5e1d36f..4b6e5b99124 100644 --- a/cpp/include/cudf/detail/aggregation/device_aggregators.cuh +++ b/cpp/include/cudf/detail/aggregation/device_aggregators.cuh @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once #include @@ -51,8 +50,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; cudf::detail::atomic_min(&target.element(target_index), static_cast(source.element(source_index))); @@ -72,8 +69,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; using DeviceTarget = device_storage_type_t; using DeviceSource = device_storage_type_t; @@ -96,8 +91,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; cudf::detail::atomic_max(&target.element(target_index), static_cast(source.element(source_index))); @@ -117,8 +110,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; using DeviceTarget = device_storage_type_t; using DeviceSource = device_storage_type_t; @@ -141,8 +132,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; cudf::detail::atomic_add(&target.element(target_index), static_cast(source.element(source_index))); @@ -162,8 +151,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; using DeviceTarget = device_storage_type_t; using DeviceSource = device_storage_type_t; @@ -227,8 +214,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - dispatch_type_and_aggregation( source.child(cudf::dictionary_column_view::keys_column_index).type(), k, @@ -249,8 +234,6 @@ struct update_target_element; auto value = static_cast(source.element(source_index)); cudf::detail::atomic_add(&target.element(target_index), value * value); @@ -267,8 +250,6 @@ struct update_target_element; cudf::detail::atomic_mul(&target.element(target_index), static_cast(source.element(source_index))); @@ -286,8 +267,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; cudf::detail::atomic_add(&target.element(target_index), Target{1}); @@ -323,8 +302,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; auto old = cudf::detail::atomic_cas( &target.element(target_index), ARGMAX_SENTINEL, source_index); @@ -349,8 +326,6 @@ struct update_target_element< column_device_view source, size_type source_index) const noexcept { - if (source.is_null(source_index)) { return; } - using Target = target_type_t; auto old = cudf::detail::atomic_cas( &target.element(target_index), ARGMIN_SENTINEL, source_index); @@ -376,6 +351,9 @@ struct elementwise_aggregator { column_device_view source, size_type source_index) const noexcept { + if constexpr (k != cudf::aggregation::COUNT_ALL) { + if (source.is_null(source_index)) { return; } + } update_target_element{}(target, target_index, source, source_index); } }; From b21bc12b8d832fc91311a21b48c72cc6fd68bf54 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 9 Oct 2024 13:11:34 -0700 Subject: [PATCH 4/9] Add comments + cuda::std::byte for device functors --- .../groupby/hash/global_memory_aggregator.cuh | 38 ++++++++++++------- .../groupby/hash/shared_memory_aggregator.cuh | 36 ++++++++++++------ 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/cpp/src/groupby/hash/global_memory_aggregator.cuh b/cpp/src/groupby/hash/global_memory_aggregator.cuh index ac0e5d7d8c2..5720eade398 100644 --- a/cpp/src/groupby/hash/global_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/global_memory_aggregator.cuh @@ -19,9 +19,9 @@ #include #include #include -#include #include +#include #include namespace cudf::groupby::detail::hash { @@ -30,7 +30,7 @@ struct update_target_element_gmem { __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const { @@ -46,7 +46,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -67,7 +67,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -89,7 +89,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -111,7 +111,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -134,7 +134,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -159,7 +159,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -173,7 +173,6 @@ struct update_target_element_gmem< } }; -// TODO: VALID and ALL have same code template struct update_target_element_gmem< Source, @@ -183,7 +182,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -206,7 +205,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -226,6 +225,7 @@ struct update_target_element_gmem< if (target.is_null(target_index)) { target.set_valid(target_index); } } }; + template struct update_target_element_gmem< Source, @@ -235,7 +235,7 @@ struct update_target_element_gmem< __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { @@ -256,12 +256,24 @@ struct update_target_element_gmem< } }; +/** + * @brief A functor that updates a single element in the target column stored in global memory by + * applying an aggregation operation to a corresponding element from a source column in shared + * memory. + * + * This functor can NOT be used for dictionary columns. + * + * This is a redundant copy replicating the behavior of `elementwise_aggregator` from + * `cudf/detail/aggregation/device_aggregators.cuh`. The key difference is that this functor accepts + * a pointer to raw bytes as the source, as `column_device_view` cannot yet be constructed from + * shared memory. + */ struct gmem_element_aggregator { template __device__ void operator()(cudf::mutable_column_device_view target, cudf::size_type target_index, cudf::column_device_view source_column, - std::byte* source, + cuda::std::byte* source, cudf::size_type source_index, bool* source_null) const noexcept { diff --git a/cpp/src/groupby/hash/shared_memory_aggregator.cuh b/cpp/src/groupby/hash/shared_memory_aggregator.cuh index 14be2cb6183..88b28ebe6e6 100644 --- a/cpp/src/groupby/hash/shared_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/shared_memory_aggregator.cuh @@ -19,15 +19,15 @@ #include #include #include -#include #include +#include #include namespace cudf::groupby::detail::hash { template struct update_target_element_shmem { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -42,7 +42,7 @@ struct update_target_element_shmem< Source, cudf::aggregation::MIN, cuda::std::enable_if_t() && cudf::has_atomic_support()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -63,7 +63,7 @@ struct update_target_element_shmem< Source, cudf::aggregation::MAX, cuda::std::enable_if_t() && cudf::has_atomic_support()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -86,7 +86,7 @@ struct update_target_element_shmem< cudf::aggregation::SUM, cuda::std::enable_if_t() && cudf::has_atomic_support() && !cudf::is_timestamp()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -108,7 +108,7 @@ struct update_target_element_shmem< Source, cudf::aggregation::SUM_OF_SQUARES, cuda::std::enable_if_t()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -128,7 +128,7 @@ struct update_target_element_shmem< Source, cudf::aggregation::PRODUCT, cuda::std::enable_if_t()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -149,7 +149,7 @@ struct update_target_element_shmem< cudf::aggregation::COUNT_VALID, cuda::std::enable_if_t< cudf::detail::is_valid_aggregation()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -167,7 +167,7 @@ struct update_target_element_shmem< cudf::aggregation::COUNT_ALL, cuda::std::enable_if_t< cudf::detail::is_valid_aggregation()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -187,7 +187,7 @@ struct update_target_element_shmem< cudf::aggregation::ARGMAX, cuda::std::enable_if_t() and cudf::is_relationally_comparable()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -213,7 +213,7 @@ struct update_target_element_shmem< cudf::aggregation::ARGMIN, cuda::std::enable_if_t() and cudf::is_relationally_comparable()>> { - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, @@ -233,9 +233,21 @@ struct update_target_element_shmem< } }; +/** + * @brief A functor that updates a single element in the target column stored in shared memory by + * applying an aggregation operation to a corresponding element from a source column in global + * memory. + * + * This functor can NOT be used for dictionary columns. + * + * This is a redundant copy replicating the behavior of `elementwise_aggregator` from + * `cudf/detail/aggregation/device_aggregators.cuh`. The key difference is that this functor accepts + * a pointer to raw bytes as the source, as `column_device_view` cannot yet be constructed from + * shared memory. + */ struct shmem_element_aggregator { template - __device__ void operator()(std::byte* target, + __device__ void operator()(cuda::std::byte* target, cudf::size_type target_index, bool* target_null, cudf::column_device_view source, From 23360f5716bf67b1bf39d85eccce22fd956a401d Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 9 Oct 2024 16:28:56 -0700 Subject: [PATCH 5/9] Add traits determing source and target type --- .../detail/aggregation/device_aggregators.cuh | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cpp/include/cudf/detail/aggregation/device_aggregators.cuh b/cpp/include/cudf/detail/aggregation/device_aggregators.cuh index 4b6e5b99124..4454ad133d9 100644 --- a/cpp/include/cudf/detail/aggregation/device_aggregators.cuh +++ b/cpp/include/cudf/detail/aggregation/device_aggregators.cuh @@ -28,6 +28,25 @@ #include namespace cudf::detail { +/// Checks if an aggregation kind needs to operate on the underlying storage type +template +__device__ constexpr bool uses_underlying_type() +{ + return k == aggregation::MIN or k == aggregation::MAX or k == aggregation::SUM; +} + +/// Gets the underlying target type for the given source type and aggregation kind +template +using underlying_target_t = + cuda::std::conditional_t(), + cudf::device_storage_type_t>, + cudf::detail::target_type_t>; + +/// Gets the underlying source type for the given source type and aggregation kind +template +using underlying_source_t = + cuda::std::conditional_t(), cudf::device_storage_type_t, Source>; + template struct update_target_element { __device__ void operator()(mutable_column_device_view target, From c2b9dd1386f8e86a44e0b62a6bec9334db3bf793 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Thu, 10 Oct 2024 10:26:05 -0700 Subject: [PATCH 6/9] Add comment --- cpp/src/groupby/hash/shared_memory_aggregator.cuh | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/groupby/hash/shared_memory_aggregator.cuh b/cpp/src/groupby/hash/shared_memory_aggregator.cuh index 88b28ebe6e6..f1a9b174e45 100644 --- a/cpp/src/groupby/hash/shared_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/shared_memory_aggregator.cuh @@ -155,6 +155,7 @@ struct update_target_element_shmem< cudf::column_device_view source, cudf::size_type source_index) const noexcept { + // The nullability was checked prior to this call in the `shmem_element_aggregator` functor using Target = cudf::detail::target_type_t; Target* target_casted = reinterpret_cast(target); cudf::detail::atomic_add(&target_casted[target_index], Target{1}); From bcc5e1f4441310cb1177dd0392ebd1f004bf21fe Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 14 Oct 2024 19:37:32 -0700 Subject: [PATCH 7/9] API cleanups --- .../groupby/hash/global_memory_aggregator.cuh | 42 +++++++------------ .../groupby/hash/shared_memory_aggregator.cuh | 39 ++++++++--------- 2 files changed, 35 insertions(+), 46 deletions(-) diff --git a/cpp/src/groupby/hash/global_memory_aggregator.cuh b/cpp/src/groupby/hash/global_memory_aggregator.cuh index 5720eade398..9b646313239 100644 --- a/cpp/src/groupby/hash/global_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/global_memory_aggregator.cuh @@ -31,8 +31,7 @@ struct update_target_element_gmem { cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const + cudf::size_type source_index) const noexcept { CUDF_UNREACHABLE("Invalid source type and aggregation combination."); } @@ -47,8 +46,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using DeviceType = cudf::detail::underlying_target_t; DeviceType* source_casted = reinterpret_cast(source); @@ -68,8 +66,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using DeviceType = cudf::detail::underlying_target_t; DeviceType* source_casted = reinterpret_cast(source); @@ -90,8 +87,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using DeviceType = cudf::detail::underlying_target_t; DeviceType* source_casted = reinterpret_cast(source); @@ -112,8 +108,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using Target = cudf::detail::target_type_t; @@ -135,8 +130,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using Target = cudf::detail::target_type_t; @@ -160,8 +154,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using Target = cudf::detail::target_type_t; @@ -183,8 +176,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using Target = cudf::detail::target_type_t; @@ -206,8 +198,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using Target = cudf::detail::target_type_t; Target* source_casted = reinterpret_cast(source); @@ -225,7 +216,6 @@ struct update_target_element_gmem< if (target.is_null(target_index)) { target.set_valid(target_index); } } }; - template struct update_target_element_gmem< Source, @@ -236,8 +226,7 @@ struct update_target_element_gmem< cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + cudf::size_type source_index) const noexcept { using Target = cudf::detail::target_type_t; Target* source_casted = reinterpret_cast(source); @@ -274,14 +263,13 @@ struct gmem_element_aggregator { cudf::size_type target_index, cudf::column_device_view source_column, cuda::std::byte* source, - cudf::size_type source_index, - bool* source_null) const noexcept + bool* source_mask, + cudf::size_type source_index) const noexcept { - if constexpr (k != cudf::aggregation::COUNT_ALL) { - if (source_null[source_index]) { return; } - } + if (!source_mask[source_index]) { return; } + update_target_element_gmem{}( - target, target_index, source_column, source, source_index, source_null); + target, target_index, source_column, source, source_index); } }; } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/groupby/hash/shared_memory_aggregator.cuh b/cpp/src/groupby/hash/shared_memory_aggregator.cuh index f1a9b174e45..49524c25545 100644 --- a/cpp/src/groupby/hash/shared_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/shared_memory_aggregator.cuh @@ -28,8 +28,8 @@ namespace cudf::groupby::detail::hash { template struct update_target_element_shmem { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const { @@ -43,8 +43,8 @@ struct update_target_element_shmem< cudf::aggregation::MIN, cuda::std::enable_if_t() && cudf::has_atomic_support()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -54,7 +54,8 @@ struct update_target_element_shmem< DeviceTarget* target_casted = reinterpret_cast(target); cudf::detail::atomic_min(&target_casted[target_index], static_cast(source.element(source_index))); - if (target_null[target_index]) { target_null[target_index] = false; } + + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -64,8 +65,8 @@ struct update_target_element_shmem< cudf::aggregation::MAX, cuda::std::enable_if_t() && cudf::has_atomic_support()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -76,7 +77,7 @@ struct update_target_element_shmem< cudf::detail::atomic_max(&target_casted[target_index], static_cast(source.element(source_index))); - if (target_null[target_index]) { target_null[target_index] = false; } + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -87,8 +88,8 @@ struct update_target_element_shmem< cuda::std::enable_if_t() && cudf::has_atomic_support() && !cudf::is_timestamp()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -99,7 +100,7 @@ struct update_target_element_shmem< cudf::detail::atomic_add(&target_casted[target_index], static_cast(source.element(source_index))); - if (target_null[target_index]) { target_null[target_index] = false; } + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -109,8 +110,8 @@ struct update_target_element_shmem< cudf::aggregation::SUM_OF_SQUARES, cuda::std::enable_if_t()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -119,7 +120,7 @@ struct update_target_element_shmem< auto value = static_cast(source.element(source_index)); cudf::detail::atomic_add(&target_casted[target_index], value * value); - if (target_null[target_index]) { target_null[target_index] = false; } + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -129,8 +130,8 @@ struct update_target_element_shmem< cudf::aggregation::PRODUCT, cuda::std::enable_if_t()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -139,7 +140,7 @@ struct update_target_element_shmem< cudf::detail::atomic_mul(&target_casted[target_index], static_cast(source.element(source_index))); - if (target_null[target_index]) { target_null[target_index] = false; } + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -150,8 +151,8 @@ struct update_target_element_shmem< cuda::std::enable_if_t< cudf::detail::is_valid_aggregation()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -169,8 +170,8 @@ struct update_target_element_shmem< cuda::std::enable_if_t< cudf::detail::is_valid_aggregation()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -189,8 +190,8 @@ struct update_target_element_shmem< cuda::std::enable_if_t() and cudf::is_relationally_comparable()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -204,7 +205,7 @@ struct update_target_element_shmem< } } - if (target_null[target_index]) { target_null[target_index] = false; } + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -215,8 +216,8 @@ struct update_target_element_shmem< cuda::std::enable_if_t() and cudf::is_relationally_comparable()>> { __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -230,7 +231,7 @@ struct update_target_element_shmem< } } - if (target_null[target_index]) { target_null[target_index] = false; } + if (!target_mask[target_index]) { target_mask[target_index] = true; } } }; @@ -249,8 +250,8 @@ struct update_target_element_shmem< struct shmem_element_aggregator { template __device__ void operator()(cuda::std::byte* target, + bool* target_mask, cudf::size_type target_index, - bool* target_null, cudf::column_device_view source, cudf::size_type source_index) const noexcept { @@ -258,7 +259,7 @@ struct shmem_element_aggregator { if (source.is_null(source_index)) { return; } } update_target_element_shmem{}( - target, target_index, target_null, source, source_index); + target, target_mask, target_index, source, source_index); } }; } // namespace cudf::groupby::detail::hash From cc81a079901bbdfc6ffc59add84f02289b5e3b26 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Thu, 17 Oct 2024 09:29:51 -0700 Subject: [PATCH 8/9] Add comments --- cpp/src/groupby/hash/global_memory_aggregator.cuh | 2 ++ cpp/src/groupby/hash/shared_memory_aggregator.cuh | 1 + 2 files changed, 3 insertions(+) diff --git a/cpp/src/groupby/hash/global_memory_aggregator.cuh b/cpp/src/groupby/hash/global_memory_aggregator.cuh index 9b646313239..86769df9b4b 100644 --- a/cpp/src/groupby/hash/global_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/global_memory_aggregator.cuh @@ -266,6 +266,8 @@ struct gmem_element_aggregator { bool* source_mask, cudf::size_type source_index) const noexcept { + // Early exit for all aggregation kinds since shared memory aggregation of + // `COUNT_ALL` is always valid if (!source_mask[source_index]) { return; } update_target_element_gmem{}( diff --git a/cpp/src/groupby/hash/shared_memory_aggregator.cuh b/cpp/src/groupby/hash/shared_memory_aggregator.cuh index 49524c25545..94fe77a87ab 100644 --- a/cpp/src/groupby/hash/shared_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/shared_memory_aggregator.cuh @@ -255,6 +255,7 @@ struct shmem_element_aggregator { cudf::column_device_view source, cudf::size_type source_index) const noexcept { + // Check nullability for all aggregation kinds but `COUNT_ALL` if constexpr (k != cudf::aggregation::COUNT_ALL) { if (source.is_null(source_index)) { return; } } From 94672cc7ee8a55b15965a73a3ef5ee0bad850b54 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Thu, 17 Oct 2024 13:04:24 -0700 Subject: [PATCH 9/9] Removing unused parameters --- .../detail/aggregation/device_aggregators.cuh | 16 ++++++++-------- .../groupby/hash/global_memory_aggregator.cuh | 10 +++++----- .../groupby/hash/shared_memory_aggregator.cuh | 7 ++----- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/cpp/include/cudf/detail/aggregation/device_aggregators.cuh b/cpp/include/cudf/detail/aggregation/device_aggregators.cuh index 4454ad133d9..204eee49a2a 100644 --- a/cpp/include/cudf/detail/aggregation/device_aggregators.cuh +++ b/cpp/include/cudf/detail/aggregation/device_aggregators.cuh @@ -49,10 +49,10 @@ using underlying_source_t = template struct update_target_element { - __device__ void operator()(mutable_column_device_view target, - size_type target_index, - column_device_view source, - size_type source_index) const noexcept + __device__ void operator()(mutable_column_device_view, + size_type, + column_device_view, + size_type) const noexcept { CUDF_UNREACHABLE("Invalid source type and aggregation combination."); } @@ -203,10 +203,10 @@ struct update_target_from_dictionary { template ()>* = nullptr> - __device__ void operator()(mutable_column_device_view target, - size_type target_index, - column_device_view source, - size_type source_index) const noexcept + __device__ void operator()(mutable_column_device_view, + size_type, + column_device_view, + size_type) const noexcept { } }; diff --git a/cpp/src/groupby/hash/global_memory_aggregator.cuh b/cpp/src/groupby/hash/global_memory_aggregator.cuh index 86769df9b4b..50e89c727ff 100644 --- a/cpp/src/groupby/hash/global_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/global_memory_aggregator.cuh @@ -27,11 +27,11 @@ namespace cudf::groupby::detail::hash { template struct update_target_element_gmem { - __device__ void operator()(cudf::mutable_column_device_view target, - cudf::size_type target_index, - cudf::column_device_view source_column, - cuda::std::byte* source, - cudf::size_type source_index) const noexcept + __device__ void operator()(cudf::mutable_column_device_view, + cudf::size_type, + cudf::column_device_view, + cuda::std::byte*, + cudf::size_type) const noexcept { CUDF_UNREACHABLE("Invalid source type and aggregation combination."); } diff --git a/cpp/src/groupby/hash/shared_memory_aggregator.cuh b/cpp/src/groupby/hash/shared_memory_aggregator.cuh index 94fe77a87ab..9cbeeb34b86 100644 --- a/cpp/src/groupby/hash/shared_memory_aggregator.cuh +++ b/cpp/src/groupby/hash/shared_memory_aggregator.cuh @@ -27,11 +27,8 @@ namespace cudf::groupby::detail::hash { template struct update_target_element_shmem { - __device__ void operator()(cuda::std::byte* target, - bool* target_mask, - cudf::size_type target_index, - cudf::column_device_view source, - cudf::size_type source_index) const + __device__ void operator()( + cuda::std::byte*, bool*, cudf::size_type, cudf::column_device_view, cudf::size_type) const { CUDF_UNREACHABLE("Invalid source type and aggregation combination."); }