Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix integer overflow in shim device_sum functions #13943

Merged
merged 6 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,23 @@ def func(group):
run_groupby_apply_jit_test(data, func, ["a"])


@pytest.mark.parametrize("dtype", ["int32"])
def test_groupby_apply_jit_sum_integer_overflow(dtype):
max = np.iinfo(dtype).max

data = DataFrame(
{
"a": [0, 0, 0],
"b": [max, max, max],
}
)

def func(group):
return group["b"].sum()

run_groupby_apply_jit_test(data, func, ["a"])


@pytest.mark.parametrize("dtype", ["float64"])
@pytest.mark.parametrize("func", ["min", "max", "sum", "mean", "var", "std"])
@pytest.mark.parametrize("special_val", [np.nan, np.inf, -np.inf])
Expand Down
73 changes: 46 additions & 27 deletions python/cudf/udf_cpp/shim.cu
Original file line number Diff line number Diff line change
Expand Up @@ -388,25 +388,58 @@ __device__ bool are_all_nans(cooperative_groups::thread_block const& block,
return count == 0;
}

template <typename T>
__device__ void device_sum(cooperative_groups::thread_block const& block,
T const* data,
int64_t size,
T* sum)
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
__device__ int64_t device_sum(cooperative_groups::thread_block const& block,
Copy link
Contributor

@bdice bdice Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than writing an entirely new SFINAE overload, could we have a single kernel and write something like this in a second template parameter:

typename AccumT = std::conditional<std::is_integral_v<T>, int64_t, T>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes! was hoping someone would comment with something like this :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

giphy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be updated! Looking much cleaner now, thanks.

T const* data,
int64_t size)
{
__shared__ int64_t block_sum;
if (block.thread_rank() == 0) { block_sum = 0; }
block.sync();

int64_t local_sum = 0;

for (int64_t idx = block.thread_rank(); idx < size; idx += block.size()) {
local_sum += static_cast<int64_t>(data[idx]);
}

cuda::atomic_ref<int64_t, cuda::thread_scope_block> ref{block_sum};
ref.fetch_add(local_sum, cuda::std::memory_order_relaxed);

block.sync();
return block_sum;
}

template <typename T, std::enable_if_t<!std::is_integral_v<T>, int> = 0>
__device__ T device_sum(cooperative_groups::thread_block const& block, T const* data, int64_t size)
{
__shared__ T block_sum;
if (block.thread_rank() == 0) { block_sum = 0; }
block.sync();

T local_sum = 0;

for (int64_t idx = block.thread_rank(); idx < size; idx += block.size()) {
local_sum += data[idx];
}

cuda::atomic_ref<T, cuda::thread_scope_block> ref{*sum};
cuda::atomic_ref<T, cuda::thread_scope_block> ref{block_sum};
ref.fetch_add(local_sum, cuda::std::memory_order_relaxed);

block.sync();
return block_sum;
}

template <typename T>
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
__device__ int64_t BlockSum(T const* data, int64_t size)
{
auto block = cooperative_groups::this_thread_block();

auto block_sum = device_sum<T>(block, data, size);
return block_sum;
}

template <typename T, std::enable_if_t<!std::is_integral_v<T>, int> = 0>
__device__ T BlockSum(T const* data, int64_t size)
{
auto block = cooperative_groups::this_thread_block();
Expand All @@ -415,11 +448,7 @@ __device__ T BlockSum(T const* data, int64_t size)
if (are_all_nans(block, data, size)) { return 0; }
}

__shared__ T block_sum;
if (block.thread_rank() == 0) { block_sum = 0; }
block.sync();

device_sum<T>(block, data, size, &block_sum);
auto block_sum = device_sum<T>(block, data, size);
return block_sum;
}

Expand All @@ -428,11 +457,7 @@ __device__ double BlockMean(T const* data, int64_t size)
{
auto block = cooperative_groups::this_thread_block();

__shared__ T block_sum;
if (block.thread_rank() == 0) { block_sum = 0; }
block.sync();

device_sum<T>(block, data, size, &block_sum);
auto block_sum = device_sum<T>(block, data, size);
return static_cast<double>(block_sum) / static_cast<double>(size);
}

Expand All @@ -443,25 +468,19 @@ __device__ double BlockCoVar(T const* lhs, T const* rhs, int64_t size)

__shared__ double block_covar;

__shared__ T block_sum_lhs;
__shared__ T block_sum_rhs;

if (block.thread_rank() == 0) {
block_covar = 0;
block_sum_lhs = 0;
block_sum_rhs = 0;
}
if (block.thread_rank() == 0) { block_covar = 0; }
block.sync();

device_sum<T>(block, lhs, size, &block_sum_lhs);
auto block_sum_lhs = device_sum<T>(block, lhs, size);

auto const mu_l = static_cast<double>(block_sum_lhs) / static_cast<double>(size);
auto const mu_r = [=]() {
if (lhs == rhs) {
// If the lhs and rhs are the same, this is calculating variance.
// Thus we can assume mu_r = mu_l.
return mu_l;
} else {
device_sum<T>(block, rhs, size, &block_sum_rhs);
auto block_sum_rhs = device_sum<T>(block, rhs, size);
return static_cast<double>(block_sum_rhs) / static_cast<double>(size);
}
}();
Expand Down