Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix integer overflow in partition scatter_map construction #13272

Merged
merged 6 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions cpp/src/partitioning/partitioning.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/gather.cuh>
wence- marked this conversation as resolved.
Show resolved Hide resolved
#include <cudf/detail/gather.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/scatter.cuh>
#include <cudf/detail/scatter.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/hash_functions.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
Expand Down Expand Up @@ -618,8 +619,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition_table(
row_output_locations, num_rows, num_partitions, scanned_block_partition_sizes_ptr);

// Use the resulting scatter map to materialize the output
auto output = detail::scatter(
input, row_partition_numbers.begin(), row_partition_numbers.end(), input, stream, mr);
auto output = detail::scatter(input, row_partition_numbers, input, stream, mr);

stream.synchronize(); // Async D2H copy must finish before returning host vec
return std::pair(std::move(output), std::move(partition_offsets));
Expand Down Expand Up @@ -693,7 +693,7 @@ struct dispatch_map_type {

// Unfortunately need to materialize the scatter map because
// `detail::scatter` requires multiple passes through the iterator
rmm::device_uvector<MapType> scatter_map(partition_map.size(), stream);
rmm::device_uvector<size_type> scatter_map(partition_map.size(), stream);
wence- marked this conversation as resolved.
Show resolved Hide resolved

// For each `partition_map[i]`, atomically increment the corresponding
// partition offset to determine `i`s location in the output
Expand All @@ -706,8 +706,7 @@ struct dispatch_map_type {
});

// Scatter the rows into their partitions
auto scattered =
cudf::detail::scatter(t, scatter_map.begin(), scatter_map.end(), t, stream, mr);
auto scattered = detail::scatter(t, scatter_map, t, stream, mr);

return std::pair(std::move(scattered), std::move(partition_offsets));
}
Expand Down
17 changes: 17 additions & 0 deletions cpp/tests/partitioning/partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cudf_test/type_lists.hpp>

#include <cudf/copying.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/partitioning.hpp>
#include <cudf/sorting.hpp>
#include <cudf/table/table.hpp>
Expand Down Expand Up @@ -328,3 +329,19 @@ TEST_F(PartitionTestNotTyped, ListOfListOfListOfIntEmpty)
CUDF_TEST_EXPECT_TABLES_EQUAL(table_to_partition, result.first->view());
EXPECT_EQ(3, result.second.size());
}

TEST_F(PartitionTestNotTyped, NoIntegerOverflow)
{
auto elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2; });
fixed_width_column_wrapper<int8_t> map(elements, elements + 129);
auto table_to_partition = cudf::table_view{{map}};

std::vector<cudf::size_type> expected_offsets{0, 65, 129};

auto expected_elements =
cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i / 65; });
fixed_width_column_wrapper<int8_t> expected(expected_elements, expected_elements + 129);
auto expected_table = cudf::table_view{{expected}};

run_partition_test(table_to_partition, map, 2, expected_table, expected_offsets);
}
31 changes: 30 additions & 1 deletion python/cudf/cudf/_lib/partitioning.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from cudf.core.buffer import acquire_spill_lock

Expand All @@ -13,6 +13,8 @@ from cudf._lib.cpp.partitioning cimport partition as cpp_partition
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns

from cudf._lib.reduce import minmax
from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count

cimport cudf._lib.cpp.types as libcudf_types
Expand All @@ -21,6 +23,29 @@ cimport cudf._lib.cpp.types as libcudf_types
@acquire_spill_lock()
def partition(list source_columns, Column partition_map,
object num_partitions):
"""Partition source columns given a partitioning map

Parameters
----------
source_columns: list[Column]
Columns to partition
partition_map: Column
Column of integer values that map each row in the input to a
partition
num_partitions: Optional[int]
Number of output partitions (deduced from unique values in
partition_map if None)

Returns
-------
Pair of reordered columns and partition offsets

Raises
------
ValueError
If the partition map has invalid entries (not all in [0,
num_partitions)).
"""

if num_partitions is None:
num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True)
Expand All @@ -30,6 +55,10 @@ def partition(list source_columns, Column partition_map,
cdef column_view c_partition_map_view = partition_map.view()

cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result
if partition_map.size > 0:
lo, hi = minmax(partition_map)
if lo < 0 or hi >= num_partitions:
raise ValueError("Partition map has invalid values")
with nogil:
c_result = move(
cpp_partition(
Expand Down
6 changes: 6 additions & 0 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2244,6 +2244,12 @@ def scatter_by_map(
Returns
-------
A list of cudf.DataFrame objects.

Raises
------
ValueError
If the map_index has invalid entries (not all in [0,
num_partitions)).
"""
# map_index might be a column name or array,
# make it a Column
Expand Down
13 changes: 13 additions & 0 deletions python/cudf/cudf/tests/test_sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,16 @@ def test_dataframe_sort_values_kind(nelem, dtype, kind):
assert_eq(sorted_df.index.values, sorted_index)
assert_eq(sorted_df["a"].values, aa[sorted_index])
assert_eq(sorted_df["b"].values, bb[sorted_index])


@pytest.mark.parametrize("ids", [[-1, 0, 1, 0], [0, 2, 3, 0]])
def test_dataframe_scatter_by_map_7513(ids):
df = DataFrame({"id": ids, "val": [0, 1, 2, 3]})
with pytest.raises(ValueError):
df.scatter_by_map(df["id"])


def test_dataframe_scatter_by_map_empty():
df = DataFrame({"a": [], "b": []})
scattered = df.scatter_by_map(df["a"])
assert len(scattered) == 0