Skip to content

Commit

Permalink
Adding string row size iterator for row to column and column to row c…
Browse files Browse the repository at this point in the history
…onversion (#10157)

This is the first step to supporting variable-width strings in the row to column and column to row code. It adds an iterator that reads the offset columns inside string columns to compute the row sizes of this variable-width data.

Note that this doesn't add support for strings yet, but is the first step in that direction.

closes #10111

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)

Approvers:
  - MithunR (https://github.com/mythrocks)
  - Nghia Truong (https://github.com/ttnghia)
  - https://github.com/nvdbaranec

URL: #10157
  • Loading branch information
hyperbolic2346 authored Feb 11, 2022
1 parent d5b2448 commit dcac052
Showing 1 changed file with 183 additions and 61 deletions.
244 changes: 183 additions & 61 deletions java/src/main/native/src/row_conversion.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/lists_column_device_view.cuh>
#include <cudf/scalar/scalar_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/table/table.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/bit.hpp>
Expand All @@ -34,6 +35,7 @@
#include <rmm/exec_policy.hpp>
#include <thrust/binary_search.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/scan.h>
#include <type_traits>
Expand Down Expand Up @@ -187,8 +189,82 @@ struct batch_data {
std::vector<row_batch> row_batches; // information about each batch such as byte count
};

struct row_offset_functor {
row_offset_functor(size_type fixed_width_only_row_size)
/**
* @brief builds row size information for tables that contain strings
*
* @param tbl table from which to compute row size information
* @param fixed_width_and_validity_size size of fixed-width and validity data in this table
* @param stream cuda stream on which to operate
* @return device vector of size_types of the row sizes of the table
*/
rmm::device_uvector<size_type> build_string_row_sizes(table_view const &tbl,
size_type fixed_width_and_validity_size,
rmm::cuda_stream_view stream) {
auto const num_rows = tbl.num_rows();
rmm::device_uvector<size_type> d_row_sizes(num_rows, stream);
thrust::uninitialized_fill(rmm::exec_policy(stream), d_row_sizes.begin(), d_row_sizes.end(), 0);

auto d_offsets_iterators = [&]() {
std::vector<strings_column_view::offset_iterator> offsets_iterators;
auto offsets_iter = thrust::make_transform_iterator(
tbl.begin(), [](auto const &col) -> strings_column_view::offset_iterator {
if (!is_fixed_width(col.type())) {
CUDF_EXPECTS(col.type().id() == type_id::STRING, "only string columns are supported!");
return strings_column_view(col).offsets_begin();
} else {
return nullptr;
}
});
std::copy_if(offsets_iter, offsets_iter + tbl.num_columns(),
std::back_inserter(offsets_iterators),
[](auto const &offset_ptr) { return offset_ptr != nullptr; });
return make_device_uvector_async(offsets_iterators, stream);
}();

auto const num_columns = static_cast<size_type>(d_offsets_iterators.size());

thrust::for_each(rmm::exec_policy(stream), thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_columns * num_rows),
[d_offsets_iterators = d_offsets_iterators.data(), num_columns, num_rows,
d_row_sizes = d_row_sizes.data()] __device__(auto element_idx) {
auto const row = element_idx % num_rows;
auto const col = element_idx / num_rows;
auto const val =
d_offsets_iterators[col][row + 1] - d_offsets_iterators[col][row];
atomicAdd(&d_row_sizes[row], val);
});

// transform the row sizes to include fixed width size and alignment
thrust::transform(rmm::exec_policy(stream), d_row_sizes.begin(), d_row_sizes.end(),
d_row_sizes.begin(), [fixed_width_and_validity_size] __device__(auto row_size) {
return util::round_up_unsafe(fixed_width_and_validity_size + row_size,
JCUDF_ROW_ALIGNMENT);
});

return d_row_sizes;
}

/**
* @brief functor to return the offset of a row in a table with string columns
*
*/
struct string_row_offset_functor {
string_row_offset_functor(device_span<size_type> _d_row_offsets)
: d_row_offsets(_d_row_offsets){};

__device__ inline size_type operator()(int row_number, int) const {
return d_row_offsets[row_number];
}

device_span<size_type> d_row_offsets;
};

/**
* @brief functor to return the offset of a row in a table with only fixed-width columns
*
*/
struct fixed_width_row_offset_functor {
fixed_width_row_offset_functor(size_type fixed_width_only_row_size)
: _fixed_width_only_row_size(fixed_width_only_row_size){};

__device__ inline size_type operator()(int row_number, int tile_row_start) const {
Expand Down Expand Up @@ -542,6 +618,10 @@ __global__ void copy_to_rows(const size_type num_rows, const size_type num_colum
auto const relative_col = el / num_fetch_rows;
auto const relative_row = el % num_fetch_rows;
auto const absolute_col = relative_col + fetch_tile.start_col;
if (input_data[absolute_col] == nullptr) {
// variable-width data
continue;
}
auto const absolute_row = relative_row + fetch_tile.start_row;
auto const col_size = col_sizes[absolute_col];
auto const col_offset = col_offsets[absolute_col];
Expand Down Expand Up @@ -1194,10 +1274,8 @@ static size_type compute_column_information(iterator begin, iterator end,
auto validity_offset = fixed_width_size_per_row;
column_starts.push_back(validity_offset);

return util::round_up_unsafe(
fixed_width_size_per_row +
util::div_rounding_up_safe(static_cast<size_type>(std::distance(begin, end)), CHAR_BIT),
JCUDF_ROW_ALIGNMENT);
return fixed_width_size_per_row +
util::div_rounding_up_safe(static_cast<size_type>(std::distance(begin, end)), CHAR_BIT);
}

/**
Expand Down Expand Up @@ -1512,20 +1590,27 @@ void determine_tiles(std::vector<size_type> const &column_sizes,
}
}

#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700

} // namespace detail

std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr) {
#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700
auto const num_columns = tbl.num_columns();
auto const num_rows = tbl.num_rows();

auto const fixed_width_only = std::all_of(
tbl.begin(), tbl.end(), [](column_view const &c) { return is_fixed_width(c.type()); });

/**
* @brief convert cudf table into JCUDF row format
*
* @tparam offsetFunctor functor type for offset functor
* @param tbl table to convert to JCUDF row format
* @param batch_info information about the batches of data
* @param offset_functor functor that returns the starting offset of each row
* @param column_starts starting offset of a column in a row
* @param column_sizes size of each element in a column
* @param fixed_width_size_per_row size of fixed-width data in a row of this table
* @param stream stream used
* @param mr selected memory resource for returned data
* @return vector of list columns containing byte columns of the JCUDF row data
*/
template <typename offsetFunctor>
std::vector<std::unique_ptr<column>>
convert_to_rows(table_view const &tbl, batch_data &batch_info, offsetFunctor offset_functor,
std::vector<size_type> const &column_starts,
std::vector<size_type> const &column_sizes,
size_type const fixed_width_size_per_row, rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr) {
int device_id;
CUDA_TRY(cudaGetDevice(&device_id));
int total_shmem_in_bytes;
Expand All @@ -1537,23 +1622,12 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
sizeof(cuda::barrier<cuda::thread_scope_block>) * NUM_TILES_PER_KERNEL_LOADED;
auto const shmem_limit_per_tile = total_shmem_in_bytes / NUM_TILES_PER_KERNEL_LOADED;

// break up the work into tiles, which are a starting and ending row/col #.
// this tile size is calculated based on the shared memory size available
// we want a single tile to fill up the entire shared memory space available
// for the transpose-like conversion.

// There are two different processes going on here. The GPU conversion of the data
// and the writing of the data into the list of byte columns that are a maximum of
// 2 gigs each due to offset maximum size. The GPU conversion portion has to understand
// this limitation because the column must own the data inside and as a result it must be
// a distinct allocation for that column. Copying the data into these final buffers would
// be prohibitively expensive, so care is taken to ensure the GPU writes to the proper buffer.
// The tiles are broken at the boundaries of specific rows based on the row sizes up
// to that point. These are row batches and they are decided first before building the
// tiles so the tiles can be properly cut around them.
auto const num_rows = tbl.num_rows();
auto const num_columns = tbl.num_columns();
auto dev_col_sizes = make_device_uvector_async(column_sizes, stream, mr);
auto dev_col_starts = make_device_uvector_async(column_starts, stream, mr);

// Get the pointers to the input columnar data ready

auto data_begin = thrust::make_transform_iterator(
tbl.begin(), [](auto const &c) { return c.template data<int8_t>(); });
std::vector<int8_t const *> input_data(data_begin, data_begin + tbl.num_columns());
Expand All @@ -1565,27 +1639,6 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
auto dev_input_data = make_device_uvector_async(input_data, stream, mr);
auto dev_input_nm = make_device_uvector_async(input_nm, stream, mr);

std::vector<size_type> column_sizes; // byte size of each column
std::vector<size_type> column_starts; // offset of column inside a row including alignment
column_sizes.reserve(num_columns);
column_starts.reserve(num_columns + 1); // we add a final offset for validity data start

auto schema_column_iter =
thrust::make_transform_iterator(thrust::make_counting_iterator(0),
[&tbl](auto i) -> std::tuple<data_type, column_view const> {
return {tbl.column(i).type(), tbl.column(i)};
});

auto const fixed_width_size_per_row = detail::compute_column_information(
schema_column_iter, schema_column_iter + num_columns, column_starts, column_sizes);

auto dev_col_sizes = make_device_uvector_async(column_sizes, stream, mr);
auto dev_col_starts = make_device_uvector_async(column_starts, stream, mr);

// total encoded row size. This includes fixed-width data, validity, and variable-width data.
auto row_size_iter = thrust::make_constant_iterator<uint64_t>(fixed_width_size_per_row);
auto batch_info = detail::build_batches(num_rows, row_size_iter, fixed_width_only, stream, mr);

// the first batch always exists unless we were sent an empty table
auto const first_batch_size = batch_info.row_batches[0].row_count;

Expand Down Expand Up @@ -1636,8 +1689,6 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
util::div_rounding_up_unsafe(validity_tile_infos.size(), NUM_VALIDITY_TILES_PER_KERNEL));
dim3 validity_threads(std::min(validity_tile_infos.size() * 32, 128lu));

detail::row_offset_functor offset_functor(fixed_width_size_per_row);

detail::copy_to_rows<<<blocks, threads, total_shmem_in_bytes, stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, gpu_tile_infos, dev_input_data.data(),
dev_col_sizes.data(), dev_col_starts.data(), offset_functor,
Expand Down Expand Up @@ -1670,6 +1721,76 @@ std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
});

return ret;
}
#endif // #if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700

} // namespace detail

std::vector<std::unique_ptr<column>> convert_to_rows(table_view const &tbl,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource *mr) {
#if !defined(__CUDA_ARCH__) || __CUDA_ARCH__ >= 700
auto const num_columns = tbl.num_columns();
auto const num_rows = tbl.num_rows();

auto const fixed_width_only = std::all_of(
tbl.begin(), tbl.end(), [](column_view const &c) { return is_fixed_width(c.type()); });

// break up the work into tiles, which are a starting and ending row/col #.
// this tile size is calculated based on the shared memory size available
// we want a single tile to fill up the entire shared memory space available
// for the transpose-like conversion.

// There are two different processes going on here. The GPU conversion of the data
// and the writing of the data into the list of byte columns that are a maximum of
// 2 gigs each due to offset maximum size. The GPU conversion portion has to understand
// this limitation because the column must own the data inside and as a result it must be
// a distinct allocation for that column. Copying the data into these final buffers would
// be prohibitively expensive, so care is taken to ensure the GPU writes to the proper buffer.
// The tiles are broken at the boundaries of specific rows based on the row sizes up
// to that point. These are row batches and they are decided first before building the
// tiles so the tiles can be properly cut around them.

std::vector<size_type> column_sizes; // byte size of each column
std::vector<size_type> column_starts; // offset of column inside a row including alignment
column_sizes.reserve(num_columns);
column_starts.reserve(num_columns + 1); // we add a final offset for validity data start

auto schema_column_iter =
thrust::make_transform_iterator(thrust::make_counting_iterator(0),
[&tbl](auto i) -> std::tuple<data_type, column_view const> {
return {tbl.column(i).type(), tbl.column(i)};
});

auto const fixed_width_size_per_row = detail::compute_column_information(
schema_column_iter, schema_column_iter + num_columns, column_starts, column_sizes);
if (fixed_width_only) {
// total encoded row size. This includes fixed-width data and validity only. It does not include
// variable-width data since it isn't copied with the fixed-width and validity kernel.
auto row_size_iter = thrust::make_constant_iterator<uint64_t>(
util::round_up_unsafe(fixed_width_size_per_row, JCUDF_ROW_ALIGNMENT));

auto batch_info = detail::build_batches(num_rows, row_size_iter, fixed_width_only, stream, mr);

detail::fixed_width_row_offset_functor offset_functor(
util::round_up_unsafe(fixed_width_size_per_row, JCUDF_ROW_ALIGNMENT));

return detail::convert_to_rows(tbl, batch_info, offset_functor, column_starts, column_sizes,
fixed_width_size_per_row, stream, mr);
} else {
auto row_sizes = detail::build_string_row_sizes(tbl, fixed_width_size_per_row, stream);

auto row_size_iter = cudf::detail::make_counting_transform_iterator(
0, detail::row_size_functor(num_rows, row_sizes.data(), 0));

auto batch_info = detail::build_batches(num_rows, row_size_iter, fixed_width_only, stream, mr);

detail::string_row_offset_functor offset_functor(batch_info.batch_row_offsets);

return detail::convert_to_rows(tbl, batch_info, offset_functor, column_starts, column_sizes,
fixed_width_size_per_row, stream, mr);
}

#else
CUDF_FAIL("Column to row conversion optimization requires volta or later hardware.");
return {};
Expand Down Expand Up @@ -1768,8 +1889,9 @@ std::unique_ptr<table> convert_from_rows(lists_column_view const &input,
auto iter = thrust::make_transform_iterator(thrust::make_counting_iterator(0), [&schema](auto i) {
return std::make_tuple(schema[i], nullptr);
});
auto const fixed_width_size_per_row =
detail::compute_column_information(iter, iter + num_columns, column_starts, column_sizes);
auto const fixed_width_size_per_row = util::round_up_unsafe(
detail::compute_column_information(iter, iter + num_columns, column_starts, column_sizes),
JCUDF_ROW_ALIGNMENT);

// Ideally we would check that the offsets are all the same, etc. but for now
// this is probably fine
Expand Down Expand Up @@ -1842,7 +1964,7 @@ std::unique_ptr<table> convert_from_rows(lists_column_view const &input,

dim3 validity_threads(std::min(validity_tile_infos.size() * 32, 128lu));

detail::row_offset_functor offset_functor(fixed_width_size_per_row);
detail::fixed_width_row_offset_functor offset_functor(fixed_width_size_per_row);

detail::copy_from_rows<<<blocks, threads, total_shmem_in_bytes, stream.value()>>>(
num_rows, num_columns, shmem_limit_per_tile, offset_functor, gpu_batch_row_boundaries.data(),
Expand Down

0 comments on commit dcac052

Please sign in to comment.