Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.06' into test/dask_c…
Browse files Browse the repository at this point in the history
…udf/wae
  • Loading branch information
mroeschke committed May 3, 2024
2 parents e753c9d + ce6902f commit 409a3bb
Show file tree
Hide file tree
Showing 34 changed files with 581 additions and 381 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ add_library(
src/io/text/multibyte_split.cu
src/io/utilities/arrow_io_source.cpp
src/io/utilities/column_buffer.cpp
src/io/utilities/column_buffer_strings.cu
src/io/utilities/config_utils.cpp
src/io/utilities/data_casting.cu
src/io/utilities/data_sink.cpp
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/hash/md5_hash.cu
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ std::unique_ptr<column> md5(table_view const& input,
// Result column allocation and creation
auto begin = thrust::make_constant_iterator(digest_size);
auto [offsets_column, bytes] =
cudf::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr);
cudf::strings::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr);

rmm::device_uvector<char> chars(bytes, stream, mr);
auto d_chars = chars.data();
Expand All @@ -322,7 +322,7 @@ std::unique_ptr<column> md5(table_view const& input,
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.num_rows()),
[d_chars, device_input = *device_input] __device__(auto row_index) {
MD5Hasher hasher(d_chars + (row_index * digest_size));
MD5Hasher hasher(d_chars + (static_cast<int64_t>(row_index) * digest_size));
for (auto const& col : device_input) {
if (col.is_valid(row_index)) {
if (col.type().id() == type_id::LIST) {
Expand Down
29 changes: 15 additions & 14 deletions cpp/src/hash/sha_hash.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -518,27 +518,28 @@ std::unique_ptr<column> sha_hash(table_view const& input,
// Result column allocation and creation
auto begin = thrust::make_constant_iterator(Hasher::digest_size);
auto [offsets_column, bytes] =
cudf::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr);
cudf::strings::detail::make_offsets_child_column(begin, begin + input.num_rows(), stream, mr);

auto chars = rmm::device_uvector<char>(bytes, stream, mr);
auto d_chars = chars.data();

auto const device_input = table_device_view::create(input, stream);

// Hash each row, hashing each element sequentially left to right
thrust::for_each(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.num_rows()),
[d_chars, device_input = *device_input] __device__(auto row_index) {
Hasher hasher(d_chars + (row_index * Hasher::digest_size));
for (auto const& col : device_input) {
if (col.is_valid(row_index)) {
cudf::type_dispatcher<dispatch_storage_type>(
col.type(), HasherDispatcher(&hasher, col), row_index);
}
}
hasher.finalize();
});
thrust::for_each(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.num_rows()),
[d_chars, device_input = *device_input] __device__(auto row_index) {
Hasher hasher(d_chars + (static_cast<int64_t>(row_index) * Hasher::digest_size));
for (auto const& col : device_input) {
if (col.is_valid(row_index)) {
cudf::type_dispatcher<dispatch_storage_type>(
col.type(), HasherDispatcher(&hasher, col), row_index);
}
}
hasher.finalize();
});

return make_strings_column(input.num_rows(), std::move(offsets_column), chars.release(), 0, {});
}
Expand Down
34 changes: 20 additions & 14 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -579,15 +579,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down Expand Up @@ -738,15 +741,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
Expand Down
33 changes: 18 additions & 15 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
{
using cudf::detail::warp_size;
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(4) size_type last_offset;
__shared__ size_t last_offset;
__shared__ __align__(16)
page_state_buffers_s<rolling_buf_size, rolling_buf_size, rolling_buf_size>
state_buffers;
Expand Down Expand Up @@ -1054,9 +1054,9 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
? gpuGetStringData(s, sb, src_pos + skipped_leaf_values + i)
: cuda::std::pair<char const*, size_t>{nullptr, 0};

__shared__ cub::WarpScan<size_type>::TempStorage temp_storage;
size_type offset, warp_total;
cub::WarpScan<size_type>(temp_storage).ExclusiveSum(len, offset, warp_total);
__shared__ cub::WarpScan<size_t>::TempStorage temp_storage;
size_t offset, warp_total;
cub::WarpScan<size_t>(temp_storage).ExclusiveSum(len, offset, warp_total);
offset += last_offset;

// choose a character parallel string copy when the average string is longer than a warp
Expand All @@ -1075,10 +1075,10 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
}
__syncwarp();
} else if (use_char_ll) {
__shared__ __align__(8) uint8_t const* pointers[warp_size];
__shared__ __align__(4) size_type offsets[warp_size];
__shared__ __align__(4) int dsts[warp_size];
__shared__ __align__(4) int lengths[warp_size];
__shared__ uint8_t const* pointers[warp_size];
__shared__ size_t offsets[warp_size];
__shared__ int dsts[warp_size];
__shared__ int lengths[warp_size];

offsets[me] = offset;
pointers[me] = reinterpret_cast<uint8_t const*>(ptr);
Expand Down Expand Up @@ -1119,15 +1119,18 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;
// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if (not s->col.is_large_string_col) {
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }
// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);
}

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ struct PageInfo {
int32_t skipped_leaf_values;
// for string columns only, the size of all the chars in the string for
// this page. only valid/computed during the base preprocess pass
size_t str_offset; // offset into string data for this page
int32_t str_bytes;
int32_t str_offset; // offset into string data for this page
bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes

// nesting information (input/output) for each page. this array contains
Expand Down Expand Up @@ -420,7 +420,8 @@ struct ColumnChunkDesc {
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_),
is_strings_to_cat(strings_to_categorical_)
is_strings_to_cat(strings_to_categorical_),
is_large_string_col(false)
{
}

Expand Down Expand Up @@ -454,7 +455,8 @@ struct ColumnChunkDesc {

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row

bool is_strings_to_cat{}; // convert strings to hashes
bool is_strings_to_cat{}; // convert strings to hashes
bool is_large_string_col{}; // `true` if string data uses 64-bit offsets
};

/**
Expand Down
29 changes: 21 additions & 8 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/detail/transform.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/detail/utilities.hpp>

#include <rmm/resource_ref.hpp>

Expand Down Expand Up @@ -99,11 +100,21 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
col_string_sizes = calculate_page_string_offsets();

// check for overflow
if (std::any_of(col_string_sizes.cbegin(), col_string_sizes.cend(), [](std::size_t sz) {
return sz > std::numeric_limits<size_type>::max();
})) {
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
auto const has_large_strings = std::any_of(col_string_sizes.cbegin(),
col_string_sizes.cend(),
[=](std::size_t sz) { return sz > threshold; });
if (has_large_strings and not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}

// mark any chunks that are large string columns
if (has_large_strings) {
for (auto& chunk : pass.chunks) {
auto const idx = chunk.src_col_index;
if (col_string_sizes[idx] > threshold) { chunk.is_large_string_col = true; }
}
}
}

// In order to reduce the number of allocations of hostdevice_vector, we allocate a single vector
Expand Down Expand Up @@ -348,11 +359,13 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row
} else if (out_buf.type.id() == type_id::STRING) {
// need to cap off the string offsets column
auto const sz = static_cast<size_type>(col_string_sizes[idx]);
CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast<size_type*>(out_buf.data()) + out_buf.size,
&sz,
sizeof(size_type),
cudaMemcpyDefault,
_stream.value()));
if (sz <= strings::detail::get_offset64_threshold()) {
CUDF_CUDA_TRY(cudaMemcpyAsync(static_cast<size_type*>(out_buf.data()) + out_buf.size,
&sz,
sizeof(size_type),
cudaMemcpyDefault,
_stream.value()));
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1169,10 +1169,10 @@ struct page_to_string_size {
struct page_offset_output_iter {
PageInfo* p;

using value_type = size_type;
using difference_type = size_type;
using pointer = size_type*;
using reference = size_type&;
using value_type = size_t;
using difference_type = size_t;
using pointer = size_t*;
using reference = size_t&;
using iterator_category = thrust::output_device_iterator_tag;

__host__ __device__ page_offset_output_iter operator+(int i) { return {p + i}; }
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/lists/detail/dremel.hpp>
#include <cudf/lists/lists_column_view.hpp>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/structs/structs_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
Expand Down Expand Up @@ -278,8 +279,9 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream)
return size_of(column.type()) * column.size();
} else if (column.type().id() == type_id::STRING) {
auto const scol = strings_column_view(column);
return cudf::detail::get_value<size_type>(scol.offsets(), column.size(), stream) -
cudf::detail::get_value<size_type>(scol.offsets(), 0, stream);
return cudf::strings::detail::get_offset_value(
scol.offsets(), column.size() + column.offset(), stream) -
cudf::strings::detail::get_offset_value(scol.offsets(), column.offset(), stream);
} else if (column.type().id() == type_id::STRUCT) {
auto const scol = structs_column_view(column);
size_t ret = 0;
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@ void cudf::io::detail::inline_column_buffer::create_string_data(size_t num_bytes
_string_data = rmm::device_buffer(num_bytes, stream, _mr);
}

std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_column_impl(
rmm::cuda_stream_view stream)
{
// no need for copies, just transfer ownership of the data_buffers to the columns
auto offsets_col = std::make_unique<column>(
data_type{type_to_id<size_type>()}, size + 1, std::move(_data), rmm::device_buffer{}, 0);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
}

namespace {

/**
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/io/utilities/column_buffer_strings.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "column_buffer.hpp"

#include <cudf/strings/detail/strings_children.cuh>
#include <cudf/strings/detail/utilities.hpp>
#include <cudf/utilities/error.hpp>

namespace cudf::io::detail {

std::unique_ptr<column> cudf::io::detail::inline_column_buffer::make_string_column_impl(
rmm::cuda_stream_view stream)
{
// if the size of _string_data is over the threshold for 64bit size_type, _data will contain
// sizes rather than offsets. need special handling for that case.
auto const threshold = static_cast<size_t>(strings::detail::get_offset64_threshold());
if (_string_data.size() > threshold) {
if (not strings::detail::is_large_strings_enabled()) {
CUDF_FAIL("String column exceeds the column size limit", std::overflow_error);
}
// create new offsets
auto const offsets_ptr = static_cast<size_type*>(_data.data());
auto offsets_col = make_numeric_column(
data_type{type_id::INT64}, size + 1, mask_state::UNALLOCATED, stream, _mr);
auto d_offsets64 = offsets_col->mutable_view().template data<int64_t>();
// it's safe to call with size + 1 because _data is also sized that large
cudf::detail::sizes_to_offsets(offsets_ptr, offsets_ptr + size + 1, d_offsets64, stream);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
} else {
// no need for copies, just transfer ownership of the data_buffers to the columns
auto offsets_col = std::make_unique<column>(
data_type{type_to_id<size_type>()}, size + 1, std::move(_data), rmm::device_buffer{}, 0);
return make_strings_column(
size, std::move(offsets_col), std::move(_string_data), null_count(), std::move(_null_mask));
}
}

} // namespace cudf::io::detail
Loading

0 comments on commit 409a3bb

Please sign in to comment.