Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into bug/melt/id_vars
Browse files Browse the repository at this point in the history
  • Loading branch information
galipremsagar authored May 16, 2024
2 parents 637334b + 1e92f3f commit 6e25fce
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 68 deletions.
1 change: 0 additions & 1 deletion conda/recipes/cudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ requirements:
- python
- cython >=3.0.3
- scikit-build-core >=0.7.0
- setuptools
- dlpack >=0.8,<1.0
- numpy 1.23
- pyarrow ==16.0.0.*
Expand Down
1 change: 0 additions & 1 deletion conda/recipes/cudf_kafka/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ requirements:
- cudf ={{ version }}
- libcudf_kafka ={{ version }}
- scikit-build-core >=0.7.0
- setuptools
{% if cuda_major != "11" %}
- cuda-cudart-dev
{% endif %}
Expand Down
12 changes: 12 additions & 0 deletions cpp/include/cudf/reduction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ enum class scan_type : bool { INCLUSIVE, EXCLUSIVE };
* @param col Input column view
* @param agg Aggregation operator applied by the reduction
* @param output_dtype The output scalar type
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output scalar with reduce result
*/
std::unique_ptr<scalar> reduce(
column_view const& col,
reduce_aggregation const& agg,
data_type output_dtype,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -96,6 +98,7 @@ std::unique_ptr<scalar> reduce(
* @param agg Aggregation operator applied by the reduction
* @param output_dtype The output scalar type
* @param init The initial value of the reduction
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output scalar with reduce result
*/
Expand All @@ -104,6 +107,7 @@ std::unique_ptr<scalar> reduce(
reduce_aggregation const& agg,
data_type output_dtype,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -145,6 +149,7 @@ std::unique_ptr<scalar> reduce(
* @param null_handling If `INCLUDE`, the reduction is valid if all elements in a segment are valid,
* otherwise null. If `EXCLUDE`, the reduction is valid if any element in the segment is valid,
* otherwise null.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output column with results of segmented reduction
*/
Expand All @@ -154,6 +159,7 @@ std::unique_ptr<column> segmented_reduce(
segmented_reduce_aggregation const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -169,6 +175,7 @@ std::unique_ptr<column> segmented_reduce(
* otherwise null. If `EXCLUDE`, the reduction is valid if any element in the segment is valid,
* otherwise null.
* @param init The initial value of the reduction
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output column with results of segmented reduction.
*/
Expand All @@ -179,6 +186,7 @@ std::unique_ptr<column> segmented_reduce(
data_type output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -195,6 +203,7 @@ std::unique_ptr<column> segmented_reduce(
* exclusive scan if scan_type::EXCLUSIVE.
* @param[in] null_handling Exclude null values when computing the result if null_policy::EXCLUDE.
* Include nulls if null_policy::INCLUDE. Any operation with a null results in a null.
* @param[in] stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate the returned scalar's device memory
* @returns Scanned output column
*/
Expand All @@ -203,19 +212,22 @@ std::unique_ptr<column> scan(
scan_aggregation const& agg,
scan_type inclusive,
null_policy null_handling = null_policy::EXCLUDE,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
* @brief Determines the minimum and maximum values of a column.
*
*
* @param col column to compute minmax
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return A std::pair of scalars with the first scalar being the minimum value and the second
* scalar being the maximum value of the input column.
*/
std::pair<std::unique_ptr<scalar>, std::unique_ptr<scalar>> minmax(
column_view const& col,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
31 changes: 19 additions & 12 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void print_cumulative_page_info(device_span<PageInfo const> d_pages,
printf("\tP %s: {%lu, %lu, %lu}\n",
is_list ? "(L)" : "",
pidx,
c_info[pidx].row_index,
c_info[pidx].end_row_index,
c_info[pidx].size_bytes);
}
}
Expand All @@ -121,16 +121,17 @@ void print_cumulative_row_info(host_span<cumulative_page_info const> sizes,
printf("------------\nCumulative sizes %s (index, row_index, size_bytes, page_key)\n",
label.c_str());
for (size_t idx = 0; idx < sizes.size(); idx++) {
printf("{%lu, %lu, %lu, %d}", idx, sizes[idx].row_index, sizes[idx].size_bytes, sizes[idx].key);
printf(
"{%lu, %lu, %lu, %d}", idx, sizes[idx].end_row_index, sizes[idx].size_bytes, sizes[idx].key);
if (splits.has_value()) {
// if we have a split at this row count and this is the last instance of this row count
auto start = thrust::make_transform_iterator(splits->begin(),
[](row_range const& i) { return i.skip_rows; });
auto end = start + splits->size();
auto split = std::find(start, end, sizes[idx].row_index);
auto split = std::find(start, end, sizes[idx].end_row_index);
auto const split_index = [&]() -> int {
if (split != end &&
((idx == sizes.size() - 1) || (sizes[idx + 1].row_index > sizes[idx].row_index))) {
if (split != end && ((idx == sizes.size() - 1) ||
(sizes[idx + 1].end_row_index > sizes[idx].end_row_index))) {
return static_cast<int>(std::distance(start, split));
}
return idx == 0 ? 0 : -1;
Expand Down Expand Up @@ -259,8 +260,9 @@ struct set_row_index {
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed in a cap, apply it
c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
// this cap is necessary because in the chunked reader, we use estimations for the row
// counts for list columns, which can result in values > than the absolute number of rows.
c_info[i].end_row_index = min(max_row, page_end_row);
}
};

Expand Down Expand Up @@ -461,6 +463,7 @@ adjust_cumulative_sizes(device_span<cumulative_page_info const> c_info,
thrust::make_discard_iterator(),
key_offsets.begin())
.second;

size_t const num_unique_keys = key_offsets_end - key_offsets.begin();
thrust::exclusive_scan(
rmm::exec_policy_nosync(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin());
Expand Down Expand Up @@ -1292,10 +1295,12 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds)
printf("\tnum_rows: %'lu\n", pass.num_rows);
printf("\tbase mem usage: %'lu\n", pass.base_mem_size);
auto const num_columns = _input_columns.size();
std::vector<size_type> h_page_offsets =
cudf::detail::make_std_vector_sync(pass.page_offsets, _stream);
for (size_t c_idx = 0; c_idx < num_columns; c_idx++) {
printf("\t\tColumn %'lu: num_pages(%'d)\n",
c_idx,
pass.page_offsets[c_idx + 1] - pass.page_offsets[c_idx]);
h_page_offsets[c_idx + 1] - h_page_offsets[c_idx]);
}
#endif

Expand Down Expand Up @@ -1362,11 +1367,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
// can be considerable.
include_decompression_scratch_size(pass.chunks, pass.pages, c_info, _stream);

auto iter = thrust::make_counting_iterator(0);
auto iter = thrust::make_counting_iterator(0);
auto const pass_max_row = pass.skip_rows + pass.num_rows;
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + pass.pages.size(),
set_row_index{pass.chunks, pass.pages, c_info, 0});
set_row_index{pass.chunks, pass.pages, c_info, pass_max_row});
// print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream);

// get the next batch of pages
Expand Down Expand Up @@ -1448,11 +1454,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
printf("\t\tTotal expected usage: %'lu\n",
total_expected_size == 0 ? subpass.decomp_page_data.size() + pass.base_mem_size
: total_expected_size + pass.base_mem_size);
std::vector<page_span> h_page_indices = cudf::detail::make_std_vector_sync(page_indices, _stream);
for (size_t c_idx = 0; c_idx < num_columns; c_idx++) {
printf("\t\tColumn %'lu: pages(%'lu - %'lu)\n",
c_idx,
page_indices[c_idx].start,
page_indices[c_idx].end);
h_page_indices[c_idx].start,
h_page_indices[c_idx].end);
}
printf("\t\tOutput chunks:\n");
for (size_t idx = 0; idx < subpass.output_chunk_read_info.size(); idx++) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/reductions/minmax.cu
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ std::pair<std::unique_ptr<scalar>, std::unique_ptr<scalar>> minmax(
} // namespace detail

std::pair<std::unique_ptr<scalar>, std::unique_ptr<scalar>> minmax(
column_view const& col, rmm::device_async_resource_ref mr)
column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::minmax(col, cudf::get_default_stream(), mr);
return detail::minmax(col, stream, mr);
}

} // namespace cudf
7 changes: 4 additions & 3 deletions cpp/src/reductions/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,21 @@ std::unique_ptr<scalar> reduce(column_view const& col,
std::unique_ptr<scalar> reduce(column_view const& col,
reduce_aggregation const& agg,
data_type output_dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::reduce(
col, agg, output_dtype, std::nullopt, cudf::get_default_stream(), mr);
return reduction::detail::reduce(col, agg, output_dtype, std::nullopt, stream, mr);
}

std::unique_ptr<scalar> reduce(column_view const& col,
reduce_aggregation const& agg,
data_type output_dtype,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::reduce(col, agg, output_dtype, init, cudf::get_default_stream(), mr);
return reduction::detail::reduce(col, agg, output_dtype, init, stream, mr);
}
} // namespace cudf
3 changes: 2 additions & 1 deletion cpp/src/reductions/scan/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ std::unique_ptr<column> scan(column_view const& input,
scan_aggregation const& agg,
scan_type inclusive,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::scan(input, agg, inclusive, null_handling, cudf::get_default_stream(), mr);
return detail::scan(input, agg, inclusive, null_handling, stream, mr);
}

} // namespace cudf
22 changes: 6 additions & 16 deletions cpp/src/reductions/segmented/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,12 @@ std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
segmented_reduce_aggregation const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::segmented_reduce(segmented_values,
offsets,
agg,
output_dtype,
null_handling,
std::nullopt,
cudf::get_default_stream(),
mr);
return reduction::detail::segmented_reduce(
segmented_values, offsets, agg, output_dtype, null_handling, std::nullopt, stream, mr);
}

std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
Expand All @@ -157,17 +152,12 @@ std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
data_type output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::segmented_reduce(segmented_values,
offsets,
agg,
output_dtype,
null_handling,
init,
cudf::get_default_stream(),
mr);
return reduction::detail::segmented_reduce(
segmented_values, offsets, agg, output_dtype, null_handling, init, stream, mr);
}

} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing)
ConfigureTest(
STREAM_STRINGS_TEST
streams/strings/case_test.cpp
Expand Down
28 changes: 14 additions & 14 deletions cpp/tests/io/parquet_chunked_reader_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ TEST_F(ParquetChunkedReaderInputLimitTest, List)
auto base_path = temp_env->get_temp_filepath("list");
auto test_filenames = input_limit_get_test_names(base_path);

constexpr int num_rows = 50'000'000;
constexpr int num_rows = 10'000'000;
constexpr int list_size = 4;

auto const stream = cudf::get_default_stream();
Expand Down Expand Up @@ -1225,14 +1225,14 @@ TEST_F(ParquetChunkedReaderInputLimitTest, List)
//
// Note that in the dictionary cases, both of these revert down to 1 chunk because the
// dictionaries dramatically shrink the size of the uncompressed data.
constexpr int expected_a[] = {2, 2, 1, 1};
input_limit_test_read(test_filenames, tbl, 0, size_t{2} * 1024 * 1024 * 1024, expected_a);
constexpr int expected_a[] = {3, 3, 1, 1};
input_limit_test_read(test_filenames, tbl, 0, 256 * 1024 * 1024, expected_a);
// smaller limit
constexpr int expected_b[] = {6, 6, 2, 1};
input_limit_test_read(test_filenames, tbl, 0, 512 * 1024 * 1024, expected_b);
constexpr int expected_b[] = {5, 5, 2, 1};
input_limit_test_read(test_filenames, tbl, 0, 128 * 1024 * 1024, expected_b);
// include output chunking as well
constexpr int expected_c[] = {11, 11, 9, 8};
input_limit_test_read(test_filenames, tbl, 128 * 1024 * 1024, 512 * 1024 * 1024, expected_c);
constexpr int expected_c[] = {10, 9, 8, 7};
input_limit_test_read(test_filenames, tbl, 32 * 1024 * 1024, 64 * 1024 * 1024, expected_c);
}

void tiny_list_rowgroup_test(bool just_list_col)
Expand Down Expand Up @@ -1318,7 +1318,7 @@ TEST_F(ParquetChunkedReaderInputLimitTest, Mixed)
auto base_path = temp_env->get_temp_filepath("mixed_types");
auto test_filenames = input_limit_get_test_names(base_path);

constexpr int num_rows = 50'000'000;
constexpr int num_rows = 10'000'000;
constexpr int list_size = 4;
constexpr int str_size = 3;

Expand Down Expand Up @@ -1400,12 +1400,12 @@ TEST_F(ParquetChunkedReaderInputLimitTest, Mixed)
//
// Note that in the dictionary cases, both of these revert down to 1 chunk because the
// dictionaries dramatically shrink the size of the uncompressed data.
constexpr int expected_a[] = {3, 3, 1, 1};
input_limit_test_read(test_filenames, tbl, 0, size_t{2} * 1024 * 1024 * 1024, expected_a);
constexpr int expected_a[] = {5, 5, 2, 1};
input_limit_test_read(test_filenames, tbl, 0, 256 * 1024 * 1024, expected_a);
// smaller limit
constexpr int expected_b[] = {10, 11, 4, 1};
input_limit_test_read(test_filenames, tbl, 0, 512 * 1024 * 1024, expected_b);
constexpr int expected_b[] = {10, 9, 3, 1};
input_limit_test_read(test_filenames, tbl, 0, 128 * 1024 * 1024, expected_b);
// include output chunking as well
constexpr int expected_c[] = {20, 21, 15, 14};
input_limit_test_read(test_filenames, tbl, 128 * 1024 * 1024, 512 * 1024 * 1024, expected_c);
constexpr int expected_c[] = {20, 18, 15, 12};
input_limit_test_read(test_filenames, tbl, 32 * 1024 * 1024, 64 * 1024 * 1024, expected_c);
}
Loading

0 comments on commit 6e25fce

Please sign in to comment.