Skip to content

Commit

Permalink
Add stream parameter to List Manipulation and Operations APIs (#14248)
Browse files Browse the repository at this point in the history
I have organized the public List APIs into **three** distinct categories based on their functionality, simplifying the PRs for easier and shorter reviews. This particular PR introduces the `stream` parameter only to the `List Manipulation and Operations APIs`, which fall under `Section 1`. See next comment for other sections.


1. List Manipulation and Operations (`combine.hpp`, `contains.hpp`, `count_elements.hpp`)

```
concatenate_rows
concatenate_list_elements
contains_nulls
contains - search_keys
contains - search_key
index_of - search_keys
index_of - search_key
count_elements
```


This PR addresses issues in the following files:

1. **column_wrapper.hpp**:
      - Corrects the improper passing of the stream value in the `make_lists_column` function.
      - Enables the missing cast to `lists_column_view`.
      - Substitutes `copy_bitmask` with `cudf::detail::copy_bitmask` to include the stream parameter.

2. **concatenate.cu:**
 
      - Substitutes `create_null_mask` with `cudf::detail::create_null_mask` to include the stream parameter.

Authors:
  - Suraj Aralihalli (https://github.com/SurajAralihalli)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Mark Harris (https://github.com/harrism)

URL: #14248
  • Loading branch information
SurajAralihalli authored Oct 9, 2023
1 parent e28017c commit e345620
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 35 deletions.
4 changes: 4 additions & 0 deletions cpp/include/cudf/lists/combine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ enum class concatenate_null_policy { IGNORE, NULLIFY_OUTPUT_ROW };
* @param input Table of lists to be concatenated.
* @param null_policy The parameter to specify whether a null list element will be ignored from
* concatenation, or any concatenation involving a null element will result in a null list.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return A new column in which each row is a list resulted from concatenating all list elements in
* the corresponding row of the input table.
*/
std::unique_ptr<column> concatenate_rows(
table_view const& input,
concatenate_null_policy null_policy = concatenate_null_policy::IGNORE,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -86,13 +88,15 @@ std::unique_ptr<column> concatenate_rows(
* @param input The lists column containing lists of list elements to concatenate.
* @param null_policy The parameter to specify whether a null list element will be ignored from
* concatenation, or any concatenation involving a null element will result in a null list.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return A new column in which each row is a list resulted from concatenating all list elements in
* the corresponding row of the input lists column.
*/
std::unique_ptr<column> concatenate_list_elements(
column_view const& input,
concatenate_null_policy null_policy = concatenate_null_policy::IGNORE,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
14 changes: 12 additions & 2 deletions cpp/include/cudf/lists/contains.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ namespace lists {
*
* @param lists Lists column whose `n` rows are to be searched
* @param search_key The scalar key to be looked up in each list row
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return BOOL8 column of `n` rows with the result of the lookup
*/
std::unique_ptr<column> contains(
cudf::lists_column_view const& lists,
cudf::scalar const& search_key,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -63,13 +65,15 @@ std::unique_ptr<column> contains(
* 2. The list row `lists[i]` is null
*
* @param lists Lists column whose `n` rows are to be searched
* @param search_keys Column of elements to be looked up in each list row
* @param search_keys Column of elements to be looked up in each list row.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return BOOL8 column of `n` rows with the result of the lookup
*/
std::unique_ptr<column> contains(
cudf::lists_column_view const& lists,
cudf::column_view const& search_keys,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -84,12 +88,14 @@ std::unique_ptr<column> contains(
* A row with an empty list will always return false.
* Nulls inside non-null nested elements (such as lists or structs) are not considered.
*
* @param lists Lists column whose `n` rows are to be searched
* @param lists Lists column whose `n` rows are to be searched.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory
* @return BOOL8 column of `n` rows with the result of the lookup
*/
std::unique_ptr<column> contains_nulls(
cudf::lists_column_view const& lists,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -125,13 +131,15 @@ enum class duplicate_find_option : int32_t {
* @param search_key The scalar key to be looked up in each list row
* @param find_option Whether to return the position of the first match (`FIND_FIRST`) or
* last (`FIND_LAST`)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return column of `n` rows with the location of the `search_key`
*/
std::unique_ptr<column> index_of(
cudf::lists_column_view const& lists,
cudf::scalar const& search_key,
duplicate_find_option find_option = duplicate_find_option::FIND_FIRST,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -160,13 +168,15 @@ std::unique_ptr<column> index_of(
* `lists`
* @param find_option Whether to return the position of the first match (`FIND_FIRST`) or
* last (`FIND_LAST`)
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return column of `n` rows with the location of the `search_key`
*/
std::unique_ptr<column> index_of(
cudf::lists_column_view const& lists,
cudf::column_view const& search_keys,
duplicate_find_option find_option = duplicate_find_option::FIND_FIRST,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/cudf/lists/count_elements.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ namespace lists {
* in the output column.
*
* @param input Input lists column
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return New column with the number of elements for each row
*/
std::unique_ptr<column> count_elements(
lists_column_view const& input,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of lists_elements group
Expand Down
45 changes: 33 additions & 12 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/copying.hpp>
#include <cudf/detail/concatenate.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/dictionary/encode.hpp>
#include <cudf/fixed_point/fixed_point.hpp>
Expand Down Expand Up @@ -1281,6 +1282,11 @@ class dictionary_column_wrapper<std::string> : public detail::column_wrapper {
template <typename T, typename SourceElementT = T>
class lists_column_wrapper : public detail::column_wrapper {
public:
/**
* @brief Cast to lists_column_view
*/
operator lists_column_view() const { return cudf::lists_column_view{wrapped->view()}; }

/**
* @brief Construct a lists column containing a single list of fixed-width
* type from an initializer list of values.
Expand Down Expand Up @@ -1542,8 +1548,12 @@ class lists_column_wrapper : public detail::column_wrapper {
rmm::device_buffer&& null_mask)
{
// construct the list column
wrapped = make_lists_column(
num_rows, std::move(offsets), std::move(values), null_count, std::move(null_mask));
wrapped = make_lists_column(num_rows,
std::move(offsets),
std::move(values),
null_count,
std::move(null_mask),
cudf::test::get_default_stream());
}

/**
Expand Down Expand Up @@ -1618,8 +1628,12 @@ class lists_column_wrapper : public detail::column_wrapper {
}();

// construct the list column
wrapped = make_lists_column(
cols.size(), std::move(offsets), std::move(data), null_count, std::move(null_mask));
wrapped = make_lists_column(cols.size(),
std::move(offsets),
std::move(data),
null_count,
std::move(null_mask),
cudf::test::get_default_stream());
}

/**
Expand Down Expand Up @@ -1647,8 +1661,12 @@ class lists_column_wrapper : public detail::column_wrapper {
depth = 0;

size_type num_elements = offsets->size() == 0 ? 0 : offsets->size() - 1;
wrapped =
make_lists_column(num_elements, std::move(offsets), std::move(c), 0, rmm::device_buffer{});
wrapped = make_lists_column(num_elements,
std::move(offsets),
std::move(c),
0,
rmm::device_buffer{},
cudf::test::get_default_stream());
}

/**
Expand Down Expand Up @@ -1697,12 +1715,15 @@ class lists_column_wrapper : public detail::column_wrapper {
}

lists_column_view lcv(col);
return make_lists_column(col.size(),
std::make_unique<column>(lcv.offsets()),
normalize_column(lists_column_view(col).child(),
lists_column_view(expected_hierarchy).child()),
col.null_count(),
copy_bitmask(col));
return make_lists_column(
col.size(),
std::make_unique<column>(lcv.offsets()),
normalize_column(lists_column_view(col).child(),
lists_column_view(expected_hierarchy).child()),
col.null_count(),
cudf::detail::copy_bitmask(
col, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()),
cudf::test::get_default_stream());
}

std::pair<std::vector<column_view>, std::vector<std::unique_ptr<column>>> preprocess_columns(
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/combine/concatenate_list_elements.cu
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ std::unique_ptr<column> concatenate_list_elements(column_view const& input,
*/
std::unique_ptr<column> concatenate_list_elements(column_view const& input,
concatenate_null_policy null_policy,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::concatenate_list_elements(input, null_policy, cudf::get_default_stream(), mr);
return detail::concatenate_list_elements(input, null_policy, stream, mr);
}

} // namespace lists
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/combine/concatenate_rows.cu
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,11 @@ std::unique_ptr<column> concatenate_rows(table_view const& input,
*/
std::unique_ptr<column> concatenate_rows(table_view const& input,
concatenate_null_policy null_policy,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::concatenate_rows(input, null_policy, cudf::get_default_stream(), mr);
return detail::concatenate_rows(input, null_policy, stream, mr);
}

} // namespace lists
Expand Down
37 changes: 21 additions & 16 deletions cpp/src/lists/contains.cu
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ std::unique_ptr<column> index_of(lists_column_view const& lists,
}

auto search_key_col = cudf::make_column_from_scalar(search_key, lists.size(), stream, mr);
return index_of(lists, search_key_col->view(), find_option, stream, mr);
return detail::index_of(lists, search_key_col->view(), find_option, stream, mr);
}

std::unique_ptr<column> index_of(lists_column_view const& lists,
Expand All @@ -306,11 +306,11 @@ std::unique_ptr<column> contains(lists_column_view const& lists,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto key_indices = index_of(lists,
search_key,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
auto key_indices = detail::index_of(lists,
search_key,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
return to_contains(std::move(key_indices), stream, mr);
}

Expand All @@ -322,11 +322,11 @@ std::unique_ptr<column> contains(lists_column_view const& lists,
CUDF_EXPECTS(search_keys.size() == lists.size(),
"Number of search keys must match list column size.");

auto key_indices = index_of(lists,
search_keys,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
auto key_indices = detail::index_of(lists,
search_keys,
duplicate_find_option::FIND_FIRST,
stream,
rmm::mr::get_current_device_resource());
return to_contains(std::move(key_indices), stream, mr);
}

Expand Down Expand Up @@ -364,43 +364,48 @@ std::unique_ptr<column> contains_nulls(lists_column_view const& lists,

std::unique_ptr<column> contains(lists_column_view const& lists,
cudf::scalar const& search_key,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::contains(lists, search_key, cudf::get_default_stream(), mr);
return detail::contains(lists, search_key, stream, mr);
}

std::unique_ptr<column> contains(lists_column_view const& lists,
column_view const& search_keys,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::contains(lists, search_keys, cudf::get_default_stream(), mr);
return detail::contains(lists, search_keys, stream, mr);
}

std::unique_ptr<column> contains_nulls(lists_column_view const& lists,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::contains_nulls(lists, cudf::get_default_stream(), mr);
return detail::contains_nulls(lists, stream, mr);
}

std::unique_ptr<column> index_of(lists_column_view const& lists,
cudf::scalar const& search_key,
duplicate_find_option find_option,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::index_of(lists, search_key, find_option, cudf::get_default_stream(), mr);
return detail::index_of(lists, search_key, find_option, stream, mr);
}

std::unique_ptr<column> index_of(lists_column_view const& lists,
column_view const& search_keys,
duplicate_find_option find_option,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::index_of(lists, search_keys, find_option, cudf::get_default_stream(), mr);
return detail::index_of(lists, search_keys, find_option, stream, mr);
}

} // namespace cudf::lists
5 changes: 3 additions & 2 deletions cpp/src/lists/copying/concatenate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/concatenate_masks.hpp>
#include <cudf/detail/get_value.cuh>
#include <cudf/detail/null_mask.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/lists/lists_column_view.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -123,8 +124,8 @@ std::unique_ptr<column> concatenate(host_span<column_view const> columns,
// if any of the input columns have nulls, construct the output mask
bool const has_nulls =
std::any_of(columns.begin(), columns.end(), [](auto const& col) { return col.has_nulls(); });
rmm::device_buffer null_mask = create_null_mask(
total_list_count, has_nulls ? mask_state::UNINITIALIZED : mask_state::UNALLOCATED);
rmm::device_buffer null_mask = cudf::detail::create_null_mask(
total_list_count, has_nulls ? mask_state::UNINITIALIZED : mask_state::UNALLOCATED, stream, mr);
auto null_mask_data = static_cast<bitmask_type*>(null_mask.data());
auto const null_count =
has_nulls ? cudf::detail::concatenate_masks(columns, null_mask_data, stream) : size_type{0};
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/lists/count_elements.cu
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ std::unique_ptr<column> count_elements(lists_column_view const& input,
// external APIS

std::unique_ptr<column> count_elements(lists_column_view const& input,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
return detail::count_elements(input, cudf::get_default_stream(), mr);
return detail::count_elements(input, stream, mr);
}

} // namespace lists
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ ConfigureTest(
)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_TEXT_TEST streams/text/ngrams_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)

# ##################################################################################################
# Install tests ####################################################################################
Expand Down
Loading

0 comments on commit e345620

Please sign in to comment.