Skip to content

Commit

Permalink
extract_list_elements() with column_view indices (#9367)
Browse files Browse the repository at this point in the history
Fixes #9172.

Adds an overload of `extract_list_element()` where the indices
may be specified as a `column_view`.

This function returns a list element from a potentially different index,
for each list row. The semantics of the scalar-index version of the function
are retained. i.e.:

0. The index is 0-based.
1. `if (list_row == null) return null;`
2. `if (index > list_row.size()) return null;`
3. `if (index == null) return null;`
4. `if (index < 0 && -index <= length) return list_row[length + index];`

This commit also reworks `extract_list_element(list, size_type, stream)`,
to use `segmented_gather()`, as per the advice in #9214.

Authors:
  - MithunR (https://github.com/mythrocks)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Mark Harris (https://github.com/harrism)
  - Nghia Truong (https://github.com/ttnghia)

URL: #9367
  • Loading branch information
mythrocks authored Oct 13, 2021
1 parent a4f6c6d commit 794863c
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 75 deletions.
43 changes: 41 additions & 2 deletions cpp/include/cudf/lists/extract.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace lists {
*/

/**
* @brief Create a column using values from row `index` from each
* sublist within the input `lists_column`.
* @brief Create a column where each row is the element at position `index` from the corresponding
* sublist in the input `lists_column`.
*
* Output `column[i]` is set from element `lists_column[i][index]`.
* If `index` is larger than the size of the sublist at `lists_column[i]`
Expand Down Expand Up @@ -65,6 +65,45 @@ std::unique_ptr<column> extract_list_element(
size_type index,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create a column where each row is a single element from the corresponding sublist
* in the input `lists_column`, selected using indices from the `indices` column.
*
* Output `column[i]` is set from element `lists_column[i][indices[i]]`.
* If `indices[i]` is larger than the size of the sublist at `lists_column[i]`
* then output `column[i] = null`.
* Similarly, if `indices[i]` is `null`, then `column[i] = null`.
*
* @code{.pseudo}
* l = { {1, 2, 3}, {4}, {5, 6} }
* r = extract_list_element(l, {0, null, 2})
* r is now {1, null, null}
* @endcode
*
* `indices[i]` may also be negative, in which case the row retrieved is offset
* from the end of each sublist.
*
* @code{.pseudo}
* l = { {"a"}, {"b", "c"}, {"d", "e", "f"} }
* r = extract_list_element(l, {-1, -2, -4})
* r is now {"a", "b", null}
* @endcode
*
* Any input where `lists_column[i] == null` produces output `column[i] = null`.
* Any input where `lists_column[i][indices[i]] == null` produces output `column[i] = null`.
*
* @param lists_column Column to extract elements from.
* @param indices The column whose rows indicate the element index to be retrieved from each list
* row.
* @param mr Device memory resource used to allocate the returned column's device memory.
* @return Column of extracted elements.
* @throws cudf::logic_error If the sizes of `lists_column` and `indices` do not match.
*/
std::unique_ptr<column> extract_list_element(
lists_column_view const& lists_column,
column_view const& indices,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
} // namespace lists
} // namespace cudf
178 changes: 110 additions & 68 deletions cpp/src/lists/extract.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,103 +16,131 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/sequence.hpp>
#include <cudf/lists/detail/gather.cuh>
#include <cudf/lists/extract.hpp>
#include <cudf/scalar/scalar_factories.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/transform.h>
#include <thrust/copy.h>
#include <thrust/iterator/constant_iterator.h>

#include <limits>

namespace cudf {
namespace lists {
namespace detail {

namespace {

/**
* @brief Convert index value for each sublist into a gather index for
* the lists column's child column.
* @brief Helper to construct a column of indices, for use with `segmented_gather()`.
*
* When indices are specified as a column, e.g. `{5, -4, 3, -2, 1, null}`,
* the column returned is: `{5, -4, 3, -2, 1, MAX_SIZE_TYPE}`.
* All null indices are replaced with `MAX_SIZE_TYPE = numeric_limits<size_type>::max()`.
*
* The returned column can then be used to construct a lists column, for use
* with `segmented_gather()`.
*/
std::unique_ptr<cudf::column> make_index_child(column_view const& indices,
size_type,
rmm::cuda_stream_view stream)
{
// New column, near identical to `indices`, except with null values replaced.
// `segmented_gather()` on a null index should produce a null row.
if (not indices.nullable()) { return std::make_unique<column>(indices, stream); }

auto const d_indices = column_device_view::create(indices);
// Replace null indices with MAX_SIZE_TYPE, so that gather() returns null for them.
auto const null_replaced_iter_begin =
cudf::detail::make_null_replacement_iterator(*d_indices, std::numeric_limits<size_type>::max());
auto index_child =
make_numeric_column(data_type{type_id::INT32}, indices.size(), mask_state::UNALLOCATED, stream);
thrust::copy_n(rmm::exec_policy(stream),
null_replaced_iter_begin,
indices.size(),
index_child->mutable_view().begin<size_type>());
return index_child;
}

/**
* @brief Helper to construct a column of indices, for use with `segmented_gather()`.
*
* When indices are specified as a size_type, e.g. `7`,
* the column returned is: `{ 7, 7, 7, 7, 7 }`.
*
* The returned column can then be used to construct a lists column, for use
* with `segmented_gather()`.
*/
std::unique_ptr<cudf::column> make_index_child(size_type index,
size_type num_rows,
rmm::cuda_stream_view stream)
{
auto index_child = // [index, index, index, ..., index]
make_numeric_column(data_type{type_id::INT32}, num_rows, mask_state::UNALLOCATED, stream);
thrust::fill_n(
rmm::exec_policy(stream), index_child->mutable_view().begin<size_type>(), num_rows, index);
return index_child;
}

/**
* @brief Helper to construct offsets column for an index vector.
*
* Constructs the sequence: `{ 0, 1, 2, 3, ... num_lists + 1}`.
* This may be used to construct an "index-list" column, where each list row
* has a single element.
*/
template <bool PositiveIndex = true>
struct map_index_fn {
column_device_view const d_offsets; // offsets to each sublist (including validity mask)
size_type const index; // index of element within each sublist
size_type const out_of_bounds; // value to use to indicate out-of-bounds

__device__ int32_t operator()(size_type idx)
{
if (d_offsets.is_null(idx)) return out_of_bounds;
auto const offset = d_offsets.element<int32_t>(idx);
auto const length = d_offsets.element<int32_t>(idx + 1) - offset;
if (PositiveIndex)
return index < length ? index + offset : out_of_bounds;
else
return index >= -length ? length + index + offset : out_of_bounds;
}
};
std::unique_ptr<cudf::column> make_index_offsets(size_type num_lists, rmm::cuda_stream_view stream)
{
return cudf::detail::sequence(
num_lists + 1, cudf::scalar_type_t<size_type>(0, true, stream), stream);
}

} // namespace

/**
* @copydoc cudf::lists::extract_list_element
*
* @tparam index_t The type used to specify the index values (either column_view or size_type)
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
template <typename index_t>
std::unique_ptr<column> extract_list_element(lists_column_view lists_column,
size_type index,
index_t const& index,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (lists_column.is_empty()) return empty_like(lists_column.child());
auto const offsets_column = lists_column.offsets();

// create a column_view with attributes of the parent and data from the offsets
column_view annotated_offsets(data_type{type_id::INT32},
lists_column.size() + 1,
offsets_column.data<int32_t>(),
lists_column.null_mask(),
lists_column.null_count(),
lists_column.offset());

// create a gather map for extracting elements from the child column
auto gather_map = make_fixed_width_column(
data_type{type_id::INT32}, annotated_offsets.size() - 1, mask_state::UNALLOCATED, stream);
auto d_gather_map = gather_map->mutable_view().data<int32_t>();
auto const child_column = lists_column.child();

// build the gather map using the offsets and the provided index
auto const d_column = column_device_view::create(annotated_offsets, stream);
if (index < 0)
thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(gather_map->size()),
d_gather_map,
map_index_fn<false>{*d_column, index, child_column.size()});
else
thrust::transform(rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(gather_map->size()),
d_gather_map,
map_index_fn<true>{*d_column, index, child_column.size()});

// call gather on the child column
auto result = cudf::detail::gather(table_view({child_column}),
gather_map->view(),
out_of_bounds_policy::NULLIFY, // nullify-out-of-bounds
cudf::detail::negative_index_policy::NOT_ALLOWED,
stream,
mr)
->release();
if (result.front()->null_count() == 0)
result.front()->set_null_mask(rmm::device_buffer{0, stream, mr}, 0);
return std::unique_ptr<column>(std::move(result.front()));
auto const num_lists = lists_column.size();
if (num_lists == 0) { return empty_like(lists_column.child()); }

// Given an index (or indices vector), an index lists column may be constructed,
// with each list row having a single element.
// E.g.
// 1. If index = 7, index_lists_column = { {7}, {7}, {7}, {7}, ... }.
// 2. If indices = {4, 3, 2, 1, null},
// index_lists_column = { {4}, {3}, {2}, {1}, {MAX_SIZE_TYPE} }.

auto const index_lists_column = make_lists_column(num_lists,
make_index_offsets(num_lists, stream),
make_index_child(index, num_lists, stream),
0,
{},
stream);

auto extracted_lists = segmented_gather(
lists_column, index_lists_column->view(), out_of_bounds_policy::NULLIFY, stream, mr);

return std::move(extracted_lists->release().children[lists_column_view::child_column_index]);
}

} // namespace detail

/**
* @copydoc cudf::lists::extract_list_element
* @copydoc cudf::lists::extract_list_element(lists_column_view const&,
* size_type,
* rmm::mr::device_memory_resource*)
*/
std::unique_ptr<column> extract_list_element(lists_column_view const& lists_column,
size_type index,
Expand All @@ -121,5 +149,19 @@ std::unique_ptr<column> extract_list_element(lists_column_view const& lists_colu
return detail::extract_list_element(lists_column, index, rmm::cuda_stream_default, mr);
}

/**
* @copydoc cudf::lists::extract_list_element(lists_column_view const&,
* column_view const&,
* rmm::mr::device_memory_resource*)
*/
std::unique_ptr<column> extract_list_element(lists_column_view const& lists_column,
column_view const& indices,
rmm::mr::device_memory_resource* mr)
{
CUDF_EXPECTS(indices.size() == lists_column.size(),
"Index column must have as many elements as lists column.");
return detail::extract_list_element(lists_column, indices, rmm::cuda_stream_default, mr);
}

} // namespace lists
} // namespace cudf
Loading

0 comments on commit 794863c

Please sign in to comment.