Skip to content

Commit

Permalink
Merge pull request #14875 from rapidsai/branch-24.02
Browse files Browse the repository at this point in the history
Forward-merge branch-24.02 to branch-24.04
  • Loading branch information
GPUtester authored Jan 25, 2024
2 parents 258d9ee + 5b1eef3 commit f5118c2
Show file tree
Hide file tree
Showing 14 changed files with 2,387 additions and 1,070 deletions.
24 changes: 21 additions & 3 deletions cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,8 +99,8 @@ inline bool operator==(feature_status_parameters const& lhs, feature_status_para
* @param[in] inputs List of input buffers
* @param[out] outputs List of output buffers
* @param[out] results List of output status structures
* @param[in] max_uncomp_chunk_size maximum size of uncompressed chunk
* @param[in] max_total_uncomp_size maximum total size of uncompressed data
* @param[in] max_uncomp_chunk_size Maximum size of any single uncompressed chunk
* @param[in] max_total_uncomp_size Maximum total size of uncompressed data
* @param[in] stream CUDA stream to use
*/
void batched_decompress(compression_type compression,
Expand All @@ -111,6 +111,24 @@ void batched_decompress(compression_type compression,
size_t max_total_uncomp_size,
rmm::cuda_stream_view stream);

/**
* @brief Return the amount of temporary space required in bytes for a given decompression
* operation.
*
* The size returned reflects the size of the scratch buffer to be passed to
* `batched_decompress_async`
*
* @param[in] compression Compression type
* @param[in] num_chunks The number of decompression chunks to be processed
* @param[in] max_uncomp_chunk_size Maximum size of any single uncompressed chunk
* @param[in] max_total_uncomp_size Maximum total size of uncompressed data
* @returns The total required size in bytes
*/
size_t batched_decompress_temp_size(compression_type compression,
size_t num_chunks,
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size);

/**
* @brief Gets the maximum size any chunk could compress to in the batch.
*
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1301,16 +1301,15 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
if (((s->col.data_type & 7) == BYTE_ARRAY) && (s->col.str_dict_index)) {
// String dictionary: use index
s->dict_base = reinterpret_cast<uint8_t const*>(s->col.str_dict_index);
s->dict_size = s->col.page_info[0].num_input_values * sizeof(string_index_pair);
s->dict_size = s->col.dict_page->num_input_values * sizeof(string_index_pair);
} else {
s->dict_base =
s->col.page_info[0].page_data; // dictionary is always stored in the first page
s->dict_size = s->col.page_info[0].uncompressed_page_size;
s->dict_base = s->col.dict_page->page_data;
s->dict_size = s->col.dict_page->uncompressed_page_size;
}
s->dict_run = 0;
s->dict_val = 0;
s->dict_bits = (cur < end) ? *cur++ : 0;
if (s->dict_bits > 32 || !s->dict_base) {
if (s->dict_bits > 32 || (!s->dict_base && s->col.dict_page->num_input_values > 0)) {
s->set_error_code(decode_error::INVALID_DICT_WIDTH);
}
break;
Expand Down
27 changes: 15 additions & 12 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,11 @@ struct gpuParsePageHeader {
* @param[in] num_chunks Number of column chunks
*/
// blockDim {128,1,1}
CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
int32_t num_chunks,
kernel_error::pointer error_code)
CUDF_KERNEL
void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
gpuParsePageHeader parse_page_header;
Expand Down Expand Up @@ -392,11 +394,10 @@ CUDF_KERNEL void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* ch
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
}
num_values = bs->ck.num_values;
page_info = bs->ck.page_info;
num_dict_pages = bs->ck.num_dict_pages;
max_num_pages = (page_info) ? bs->ck.max_num_pages : 0;
values_found = 0;
num_values = bs->ck.num_values;
page_info = chunk_pages ? chunk_pages[chunk].pages : nullptr;
max_num_pages = page_info ? bs->ck.max_num_pages : 0;
values_found = 0;
__syncwarp();
while (values_found < num_values && bs->cur < bs->end) {
int index_out = -1;
Expand Down Expand Up @@ -495,9 +496,9 @@ CUDF_KERNEL void __launch_bounds__(128)
if (!lane_id && ck->num_dict_pages > 0 && ck->str_dict_index) {
// Data type to describe a string
string_index_pair* dict_index = ck->str_dict_index;
uint8_t const* dict = ck->page_info[0].page_data;
int dict_size = ck->page_info[0].uncompressed_page_size;
int num_entries = ck->page_info[0].num_input_values;
uint8_t const* dict = ck->dict_page->page_data;
int dict_size = ck->dict_page->uncompressed_page_size;
int num_entries = ck->dict_page->num_input_values;
int pos = 0, cur = 0;
for (int i = 0; i < num_entries; i++) {
int len = 0;
Expand All @@ -518,13 +519,15 @@ CUDF_KERNEL void __launch_bounds__(128)
}

void __host__ DecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
dim3 dim_block(128, 1);
dim3 dim_grid((num_chunks + 3) >> 2, 1); // 1 chunk per warp, 4 warps per block
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_chunks, error_code);
gpuDecodePageHeaders<<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, chunk_pages, num_chunks, error_code);
}

void __host__ BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -868,14 +868,16 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
dict_size = col.dict_page->num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
dict_base = col.dict_page->page_data;
dict_size = col.dict_page->uncompressed_page_size;
}

// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }
if (s->dict_bits > 32 || (!dict_base && col.dict_page->num_input_values > 0)) {
CUDF_UNREACHABLE("invalid dictionary bit size");
}

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
Expand Down
60 changes: 43 additions & 17 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -339,6 +339,21 @@ struct PageInfo {
decode_kernel_mask kernel_mask;
};

/**
* @brief Return the column schema id as the key for a PageInfo struct.
*/
struct get_page_key {
__device__ int32_t operator()(PageInfo const& page) const { return page.src_col_schema; }
};

/**
* @brief Return an iterator that returns they keys for a vector of pages.
*/
inline auto make_page_key_iterator(device_span<PageInfo const> pages)
{
return thrust::make_transform_iterator(pages.begin(), get_page_key{});
}

/**
* @brief Struct describing a particular chunk of column data
*/
Expand All @@ -362,7 +377,8 @@ struct ColumnChunkDesc {
int8_t decimal_precision_,
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_)
int32_t src_col_schema_,
float list_bytes_per_row_est_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
num_values(num_values_),
Expand All @@ -375,7 +391,7 @@ struct ColumnChunkDesc {
num_data_pages(0),
num_dict_pages(0),
max_num_pages(0),
page_info(nullptr),
dict_page(nullptr),
str_dict_index(nullptr),
valid_map_base{nullptr},
column_data_base{nullptr},
Expand All @@ -386,26 +402,25 @@ struct ColumnChunkDesc {
decimal_precision(decimal_precision_),
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_)
src_col_schema(src_col_schema_),
list_bytes_per_row_est(list_bytes_per_row_est_)
{
}

uint8_t const* compressed_data{}; // pointer to compressed column chunk data
size_t compressed_size{}; // total compressed data size for this chunk
size_t num_values{}; // total number of values in this column
size_t start_row{}; // starting row of this chunk
uint32_t num_rows{}; // number of rows in this chunk
uint8_t const* compressed_data{}; // pointer to compressed column chunk data
size_t compressed_size{}; // total compressed data size for this chunk
size_t num_values{}; // total number of values in this column
size_t start_row{}; // file-wide, absolute starting row of this chunk
uint32_t num_rows{}; // number of rows in this chunk
int16_t max_level[level_type::NUM_LEVEL_TYPES]{}; // max definition/repetition level
int16_t max_nesting_depth{}; // max nesting depth of the output
uint16_t data_type{}; // basic column data type, ((type_length << 3) |
// parquet::Type)
uint16_t data_type{}; // basic column data type, ((type_length << 3) | // parquet::Type)
uint8_t
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
int32_t num_dict_pages{}; // number of dictionary pages
int32_t max_num_pages{}; // size of page_info array
PageInfo* page_info{}; // output page info for up to num_dict_pages +
// num_data_pages (dictionary pages first)
level_bits[level_type::NUM_LEVEL_TYPES]{}; // bits to encode max definition/repetition levels
int32_t num_data_pages{}; // number of data pages
int32_t num_dict_pages{}; // number of dictionary pages
int32_t max_num_pages{}; // size of page_info array
PageInfo const* dict_page{};
string_index_pair* str_dict_index{}; // index for string dictionary
bitmask_type** valid_map_base{}; // base pointers of valid bit map for this column
void** column_data_base{}; // base pointers of column data
Expand All @@ -418,6 +433,15 @@ struct ColumnChunkDesc {

int32_t src_col_index{}; // my input column index
int32_t src_col_schema{}; // my schema index in the file

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row
};

/**
* @brief A utility structure for use in decoding page headers.
*/
struct chunk_page_info {
PageInfo* pages;
};

/**
Expand Down Expand Up @@ -578,11 +602,13 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
* @brief Launches kernel for parsing the page headers in the column chunks
*
* @param[in] chunks List of column chunks
* @param[in] chunk_pages List of pages associated with the chunks, in chunk-sorted order
* @param[in] num_chunks Number of column chunks
* @param[out] error_code Error code for kernel failures
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks,
chunk_page_info* chunk_pages,
int32_t num_chunks,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream);
Expand Down
Loading

0 comments on commit f5118c2

Please sign in to comment.