From 3b1b96040e097bd848ca47e85c9a2208b6b58863 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 30 Jan 2024 14:11:28 -0800 Subject: [PATCH 01/17] encode delta_byte_array --- cpp/include/cudf/io/parquet.hpp | 62 ++++ cpp/src/io/functions.cpp | 13 + cpp/src/io/parquet/page_enc.cu | 358 +++++++++++++++++++++-- cpp/src/io/parquet/parquet_gpu.hpp | 12 +- cpp/src/io/parquet/writer_impl.cu | 29 +- cpp/src/io/parquet/writer_impl.hpp | 3 +- cpp/tests/io/parquet_reader_test.cpp | 81 ++--- python/cudf/cudf/_lib/cpp/io/parquet.pxd | 10 +- python/cudf/cudf/_lib/parquet.pyx | 2 + python/cudf/cudf/core/dataframe.py | 2 + python/cudf/cudf/io/parquet.py | 4 + python/cudf/cudf/tests/test_parquet.py | 59 ++-- python/cudf/cudf/utils/ioutils.py | 3 + 13 files changed, 544 insertions(+), 94 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index dc035db8d39..3e870932c24 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -563,6 +563,9 @@ class parquet_writer_options { std::shared_ptr _compression_stats; // write V2 page headers? bool _v2_page_headers = false; + // Use DELTA_BYTE_ARRAY when dictionary encoding is not available rather than the default + // DELTA_LENGTH_BYTE_ARRAY + bool _prefer_delta_byte_array = false; /** * @brief Constructor from sink and table. @@ -761,6 +764,13 @@ class parquet_writer_options { */ [[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; } + /** + * @brief Returns `true` if DELTA_BYTE_ARRAY is the preferred string encoding. + * + * @return `true` if DELTA_BYTE_ARRAY is the preferred string encoding. + */ + [[nodiscard]] auto is_enabled_prefer_dba() const { return _prefer_delta_byte_array; } + /** * @brief Sets partitions. * @@ -892,6 +902,13 @@ class parquet_writer_options { * @param val Boolean value to enable/disable writing of V2 page headers. */ void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + + /** + * @brief Sets preference for delta encoding. + * + * @param val Boolean value to enable/disable use of DELTA_BYTE_ARRAY encoding. + */ + void enable_prefer_dba(bool val) { _prefer_delta_byte_array = val; } }; /** @@ -1143,6 +1160,20 @@ class parquet_writer_options_builder { */ parquet_writer_options_builder& write_v2_headers(bool enabled); + /** + * @brief Set to true if DELTA_BYTE_ARRAY encoding should be used. + * + * The default encoding for all columns is dictionary encoding. When dictionary encoding + * cannot be used (it was disabled, the dictionary is too large), the parquet writer + * will usually fall back to PLAIN encoding. If V2 headers are enabled, however, the + * choice for fall back is DELTA_LENGTH_BYTE_ARRAY. Setting this to `true` will use + * DELTA_BYTE_ARRAY encoding instead. This will apply to all string columns. + * + * @param enabled Boolean value to enable/disable use of DELTA_BYTE_ARRAY encoding. + * @return this for chaining + */ + parquet_writer_options_builder& prefer_dba(bool enabled); + /** * @brief move parquet_writer_options member once it's built. */ @@ -1230,6 +1261,9 @@ class chunked_parquet_writer_options { std::shared_ptr _compression_stats; // write V2 page headers? bool _v2_page_headers = false; + // Use DELTA_BYTE_ARRAY when dictionary encoding is not available rather than the default + // DELTA_LENGTH_BYTE_ARRAY + bool _prefer_delta_byte_array = false; /** * @brief Constructor from sink. @@ -1384,6 +1418,13 @@ class chunked_parquet_writer_options { */ [[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; } + /** + * @brief Returns `true` if DELTA_BYTE_ARRAY is the preferred string encoding. + * + * @return `true` if DELTA_BYTE_ARRAY is the preferred string encoding. + */ + [[nodiscard]] auto is_enabled_prefer_dba() const { return _prefer_delta_byte_array; } + /** * @brief Sets metadata. * @@ -1501,6 +1542,13 @@ class chunked_parquet_writer_options { */ void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + /** + * @brief Sets preference for delta encoding. + * + * @param val Boolean value to enable/disable use of DELTA_BYTE_ARRAY encoding. + */ + void enable_prefer_dba(bool val) { _prefer_delta_byte_array = val; } + /** * @brief creates builder to build chunked_parquet_writer_options. * @@ -1612,6 +1660,20 @@ class chunked_parquet_writer_options_builder { */ chunked_parquet_writer_options_builder& write_v2_headers(bool enabled); + /** + * @brief Set to true if DELTA_BYTE_ARRAY encoding should be used. + * + * The default encoding for all columns is dictionary encoding. When dictionary encoding + * cannot be used (it was disabled, the dictionary is too large), the parquet writer + * will usually fall back to PLAIN encoding. If V2 headers are enabled, however, the + * choice for fall back is DELTA_LENGTH_BYTE_ARRAY. Setting this to `true` will use + * DELTA_BYTE_ARRAY encoding instead. This will apply to all string columns. + * + * @param enabled Boolean value to enable/disable use of DELTA_BYTE_ARRAY encoding. + * @return this for chaining + */ + chunked_parquet_writer_options_builder& prefer_dba(bool enabled); + /** * @brief Sets the maximum row group size, in bytes. * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 42f2fd02d52..1678d588f84 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -809,6 +809,12 @@ parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers return *this; } +parquet_writer_options_builder& parquet_writer_options_builder::prefer_dba(bool enabled) +{ + options.enable_prefer_dba(enabled); + return *this; +} + void chunked_parquet_writer_options::set_key_value_metadata( std::vector> metadata) { @@ -897,6 +903,13 @@ chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder:: return *this; } +chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::prefer_dba( + bool enabled) +{ + options.enable_prefer_dba(enabled); + return *this; +} + chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val) { diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 2f351edd2b9..e80ad9f92fd 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -15,6 +15,7 @@ */ #include "delta_enc.cuh" +#include "page_string_utils.cuh" #include "parquet_gpu.cuh" #include @@ -33,6 +34,7 @@ #include #include +#include #include #include #include @@ -499,7 +501,8 @@ CUDF_KERNEL void __launch_bounds__(128) __device__ size_t delta_data_len(Type physical_type, cudf::type_id type_id, uint32_t num_values, - size_t page_size) + size_t page_size, + bool prefer_delta_byte_array) { auto const dtype_len_out = physical_type_len(physical_type, type_id); auto const dtype_len = [&]() -> uint32_t { @@ -519,6 +522,7 @@ __device__ size_t delta_data_len(Type physical_type, // divisible by 128 (via static assert on delta::block_size), but do safe division anyway. auto const bytes_per_block = cudf::util::div_rounding_up_unsafe(max_bits * vals_per_block, 8); auto const block_size = mini_block_header_size + bytes_per_block; + auto const block_mult = physical_type == BYTE_ARRAY and prefer_delta_byte_array ? 2 : 1; // delta header is 2 bytes for the block_size, 1 byte for number of mini-blocks, // max 5 bytes for number of values, and max dtype_len + 1 for first value. @@ -530,11 +534,11 @@ __device__ size_t delta_data_len(Type physical_type, // data we also need to add size of the char data. `page_size` that is passed in is the // plain encoded size (i.e. num_values * sizeof(size_type) + char_data_len), so the char // data len is `page_size` minus the first term. - // TODO: this will need to change for DELTA_BYTE_ARRAY encoding + // `block_mult` takes into account the two delta binary blocks for DELTA_BYTE_ARRAY. auto const char_data_len = physical_type == BYTE_ARRAY ? page_size - num_values * sizeof(size_type) : 0; - return header_size + num_blocks * block_size + char_data_len; + return header_size + num_blocks * block_mult * block_size + char_data_len; } // blockDim {128,1,1} @@ -550,7 +554,8 @@ CUDF_KERNEL void __launch_bounds__(128) size_t max_page_size_bytes, size_type max_page_size_rows, uint32_t page_align, - bool write_v2_headers) + bool write_v2_headers, + bool prefer_delta_byte_array) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach __shared__ __align__(8) parquet_column_device_view col_g; @@ -753,8 +758,8 @@ CUDF_KERNEL void __launch_bounds__(128) } // get a different bound if using delta encoding if (is_use_delta) { - auto const delta_len = - delta_data_len(physical_type, type_id, page_g.num_leaf_values, page_size); + auto const delta_len = delta_data_len( + physical_type, type_id, page_g.num_leaf_values, page_size, prefer_delta_byte_array); page_size = max(page_size, delta_len); } auto const max_data_size = @@ -770,11 +775,41 @@ CUDF_KERNEL void __launch_bounds__(128) // 4-byte length indicator, so subtract that. page_g.var_bytes_size = var_bytes_size; } + // set encoding + if (is_use_delta) { + // TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA + // should always be preferred over PLAIN, but DELTA_BINARY is a different matter. + // If the delta encoding size is going to be close to 32 bits anyway, then plain + // is a better choice. + page_g.kernel_mask = physical_type == BYTE_ARRAY + ? prefer_delta_byte_array ? encode_kernel_mask::DELTA_BYTE_ARRAY + : encode_kernel_mask::DELTA_LENGTH_BA + : encode_kernel_mask::DELTA_BINARY; + } else if (ck_g.use_dictionary || physical_type == BOOLEAN) { + page_g.kernel_mask = encode_kernel_mask::DICTIONARY; + } else { + page_g.kernel_mask = encode_kernel_mask::PLAIN; + } page_g.max_data_size = static_cast(max_data_size); pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; page_offset += util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); + // if encoding delta_byte_array, need to allocate some space for scratch data. + // if there are leaf nulls, we need space for a mapping array: + // sizeof(size_type) * num_leaf_values + // we always need prefix lengths: sizeof(size_type) * num_valid + if (page_g.kernel_mask == encode_kernel_mask::DELTA_BYTE_ARRAY) { + // scratch needs to be aligned to a size_type boundary + auto const pg_end = reinterpret_cast(ck_g.uncompressed_bfr + page_offset); + auto scratch = util::round_up_unsafe(pg_end, sizeof(size_type)); + if (page_g.num_valid != page_g.num_leaf_values) { + scratch += sizeof(size_type) * page_g.num_leaf_values; + } + scratch += sizeof(size_type) * page_g.num_valid; + page_offset = + thrust::distance(ck_g.uncompressed_bfr, reinterpret_cast(scratch)); + } if (not comp_page_sizes.empty()) { // V2 does not include level data in compressed size estimate comp_page_offset += page_g.max_hdr_size + page_g.max_lvl_size + @@ -788,19 +823,6 @@ CUDF_KERNEL void __launch_bounds__(128) __syncwarp(); if (t == 0) { if (not pages.empty()) { - // set encoding - if (is_use_delta) { - // TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA - // should always be preferred over PLAIN, but DELTA_BINARY is a different matter. - // If the delta encoding size is going to be close to 32 bits anyway, then plain - // is a better choice. - page_g.kernel_mask = physical_type == BYTE_ARRAY ? encode_kernel_mask::DELTA_LENGTH_BA - : encode_kernel_mask::DELTA_BINARY; - } else if (ck_g.use_dictionary || physical_type == BOOLEAN) { - page_g.kernel_mask = encode_kernel_mask::DICTIONARY; - } else { - page_g.kernel_mask = encode_kernel_mask::PLAIN; - } // need space for the chunk histograms plus data page histograms auto const num_histograms = num_pages - ck_g.num_dict_pages(); if (ck_g.def_histogram_data != nullptr && col_g.max_def_level > 0) { @@ -2141,6 +2163,293 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s, output_ptr + string_data_len, pages, comp_in, comp_out, comp_results, true); } +// DELTA_BYTE_ARRAY page data encoder +// blockDim(128, 1, 1) +template +__global__ void __launch_bounds__(block_size, 8) + gpuEncodeDeltaByteArrayPages(device_span pages, + device_span> comp_in, + device_span> comp_out, + device_span comp_results) +{ + using cudf::detail::warp_size; + // block of shared memory for value storage and bit packing + __shared__ uleb128_t delta_shared[delta::buffer_size + delta::block_size]; + __shared__ __align__(8) page_enc_state_s<0> state_g; + __shared__ delta_binary_packer packer; + __shared__ uint8_t* scratch_data; + __shared__ size_t avg_suffix_len; + using block_scan = cub::BlockScan; + using block_reduce = cub::BlockReduce; + __shared__ union { + typename block_scan::TempStorage scan_storage; + typename block_reduce::TempStorage reduce_storage; + typename delta_binary_packer::index_scan::TempStorage delta_index_tmp; + typename delta_binary_packer::block_reduce::TempStorage delta_reduce_tmp; + typename delta_binary_packer::warp_reduce::TempStorage + delta_warp_red_tmp[delta::num_mini_blocks]; + } temp_storage; + + auto* const s = &state_g; + uint32_t t = threadIdx.x; + + if (t == 0) { + state_g = page_enc_state_s<0>{}; + s->page = pages[blockIdx.x]; + s->ck = *s->page.chunk; + s->col = *s->ck.col_desc; + s->rle_len_pos = nullptr; + // get s->cur back to where it was at the end of encoding the rep and def level data + set_page_data_start(s); + } + __syncthreads(); + + if (BitAnd(s->page.kernel_mask, encode_kernel_mask::DELTA_BYTE_ARRAY) == 0) { return; } + + // Encode data values + if (t == 0) { + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = Encoding::DELTA_BYTE_ARRAY; + s->page_start_val = row_to_value_idx(s->page.start_row, s->col); + s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); + + // set pointer to beginning of scratch space (aligned to size_type boundary) + auto scratch_start = + reinterpret_cast(s->page.page_data + s->page.max_hdr_size + s->page.max_data_size); + scratch_start = util::round_up_unsafe(scratch_start, sizeof(size_type)); + scratch_data = reinterpret_cast(scratch_start); + } + __syncthreads(); + + // create offsets map (if needed) + // We only encode valid values, and we need to know adjacent valid strings. So first we'll + // create a mapping of leaf indexes to valid indexes: + // + // validity array is_valid: + // 1 1 0 1 0 1 1 0 + // + // exclusive scan on is_valid yields mapping of leaf index -> valid index: + // 0 1 2 2 3 3 4 5 + // + // Last value should equal page.num_valid. Now we need to transform that into a reverse + // lookup that maps valid index -> leaf index (of length num_valid): + // 0 1 3 5 6 + // + auto const has_leaf_nulls = s->page.num_valid != s->page.num_leaf_values; + + size_type* const offsets_map = + has_leaf_nulls ? reinterpret_cast(scratch_data) : nullptr; + + if (offsets_map != nullptr) { + size_type* const forward_map = offsets_map + s->page.num_valid; + + // create the validity array + for (int idx = t; idx < s->page.num_leaf_values; idx += block_size) { + size_type const idx_in_col = s->page_start_val + idx; + bool const is_valid = + idx_in_col < s->col.leaf_column->size() and s->col.leaf_column->is_valid(idx_in_col); + forward_map[idx] = is_valid ? 1 : 0; + } + __syncthreads(); + + // exclusive scan to get leaf_idx -> valid_idx + block_excl_sum(forward_map, s->page.num_leaf_values, 0); + + // now reverse map to get valid_idx -> leaf_idx mapping + for (int idx = t; idx < s->page.num_leaf_values; idx += block_size) { + size_type const idx_in_col = s->page_start_val + idx; + bool const is_valid = + idx_in_col < s->col.leaf_column->size() and s->col.leaf_column->is_valid(idx_in_col); + if (is_valid) { offsets_map[forward_map[idx]] = idx; } + } + __syncthreads(); + } + + size_type* const prefix_lengths = + has_leaf_nulls ? offsets_map + s->page.num_valid : reinterpret_cast(scratch_data); + + auto const type_id = s->col.leaf_column->type().id(); + + auto const get_string_tuple = [type_id, s](int idx) -> thrust::pair { + if (type_id == type_id::STRING) { + auto const str = s->col.leaf_column->element(idx); + return {str.size_bytes(), reinterpret_cast(str.data())}; + } else if (s->col.output_as_byte_array && type_id == type_id::LIST) { + auto const str = get_element(*s->col.leaf_column, idx); + return {static_cast(str.size_bytes()), + reinterpret_cast(str.data())}; + } + return {0, nullptr}; + }; + + /* + some timing results. remove when done + + flat mixed file, ints, short and long-ish strings + Ns N char sorted hash + Nu N char unsorted hash + + N N string per thread for prefix and strcopy + Y N string per warp for prefix, string per thread for strcopy + N Y string per thread for prefix, parallel strcpy + Y Y string per warp for prefix, parallel strcpy + + encode times (in ms) + + N N Y N N Y Y Y + flat 44 137 236 343 + 64s 15 33 29 36 + 64u 63 45 26 36 + 128s 142 42 23 22 + 128u 332 241 16 21 + 256s 370 289 14 14 + 256u 437 389 12 14 + */ + + // Calculate prefix lengths. The first prefix length is always 0. loop over num_valid since we + // only encode valid values. + // Note: calculating this on a string-per-thread basis seems bad for large strings with lots + // of overlap. But in testing, it was found that the string copy at the end had a much larger + // impact on performance, and doing this step on a string-per-warp basis was always slower. + if (t == 0) { prefix_lengths[0] = 0; } + for (int idx = t + 1; idx < s->page.num_valid; idx += block_size) { + // FIXME: for now doing this a string per thread. this will be bad for large + // strings with large prefix lengths. for the latter we should take a string per block + // or maybe string per warp approach. + size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; + size_type const pleaf_idx = has_leaf_nulls ? offsets_map[idx - 1] : idx - 1; + + // get this string and the preceding string + auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); + auto const [len_p, ptr_p] = get_string_tuple(pleaf_idx + s->page_start_val); + + // calculate the amount of overlap + auto const max_pref_len = min(len, len_p); + size_type pref_len = 0; + while (pref_len < max_pref_len and ptr[pref_len] == ptr_p[pref_len]) { + pref_len++; + } + prefix_lengths[idx] = pref_len; + } + + // encode prefix lengths + if (t == 0) { + packer.init(s->cur, s->page.num_valid, reinterpret_cast(delta_shared), &temp_storage); + } + __syncthreads(); + + // don't start at `t` because all threads must participate in each iteration + for (int idx = 0; idx < s->page.num_valid; idx += block_size) { + size_type const t_idx = idx + t; + auto const in_range = t_idx < s->page.num_valid; + auto const val = in_range ? prefix_lengths[t_idx] : 0; + packer.add_value(val, in_range); + } + + auto const suffix_ptr = packer.flush(); + __syncthreads(); + + // encode suffix lengths + if (t == 0) { + packer.init( + suffix_ptr, s->page.num_valid, reinterpret_cast(delta_shared), &temp_storage); + } + __syncthreads(); + + size_t non_zero = 0; + size_t suffix_bytes = 0; + + for (int idx = 0; idx < s->page.num_valid; idx += block_size) { + size_type const t_idx = idx + t; + auto const in_range = t_idx < s->page.num_valid; + int32_t val = 0; + if (in_range) { + size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; + auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); + val = len - prefix_lengths[t_idx]; + if (val > 0) { + non_zero++; + suffix_bytes += val; + } + } + packer.add_value(val, in_range); + } + + auto const strings_ptr = packer.flush(); + + non_zero = block_reduce(temp_storage.reduce_storage).Sum(non_zero); + __syncthreads(); + suffix_bytes = block_reduce(temp_storage.reduce_storage).Sum(suffix_bytes); + if (t == 0) { avg_suffix_len = util::div_rounding_up_unsafe(suffix_bytes, non_zero); } + __syncthreads(); + + // Now copy the byte array data. For shorter suffixes (<= 64 bytes), it is faster to use + // memcpy on a string-per-thread basis. For longer suffixes, it's better to use a parallel + // approach. + // Note: this param still needs tuning. + constexpr size_t suffix_cutoff = 64; + + size_t str_data_len = 0; + if (avg_suffix_len <= suffix_cutoff) { + for (int idx = 0; idx < s->page.num_valid; idx += block_size) { + size_type const t_idx = idx + t; + size_type s_len = 0, pref_len = 0, suff_len = 0; + uint8_t const* s_ptr = nullptr; + if (t_idx < s->page.num_valid) { + int const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; + auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); + s_len = len; + s_ptr = ptr; + pref_len = prefix_lengths[t_idx]; + suff_len = len - pref_len; + } + + // calculate offsets into output + size_type s_off, total; + block_scan(temp_storage.scan_storage) + .ExclusiveScan(suff_len, s_off, str_data_len, cub::Sum(), total); + + if (t_idx < s->page.num_valid) { + auto const dst = strings_ptr + s_off; + memcpy(dst, s_ptr + pref_len, suff_len); + } + str_data_len += total; + __syncthreads(); + } + } else { + int t0 = 0; // thread 0 for each string + for (int idx = 0; idx < s->page.num_valid; idx++) { + // calculate ids for this string + int mytid = t - t0; + if (mytid < 0) { mytid += block_size; } + + // fetch string for this iter + size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; + auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); + size_type const pref_len = prefix_lengths[idx]; + size_type const suff_len = len - pref_len; + + // now copy the data + auto const dst = strings_ptr + str_data_len; + for (int i = 0; i < suff_len; i += block_size) { + size_type const src_idx = i + mytid; + if (src_idx < suff_len) { dst[src_idx] = ptr[pref_len + src_idx]; } + } + + str_data_len += suff_len; + t0 += suff_len; + t0 = t0 % block_size; + } + } + + finish_page_encode( + s, strings_ptr + str_data_len, pages, comp_in, comp_out, comp_results, true); +} + constexpr int decide_compression_warps_in_block = 4; constexpr int decide_compression_block_size = decide_compression_warps_in_block * cudf::detail::warp_size; @@ -3049,6 +3358,7 @@ void InitEncoderPages(device_2dspan chunks, size_type max_page_size_rows, uint32_t page_align, bool write_v2_headers, + bool prefer_delta_byte_array, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream) @@ -3066,7 +3376,8 @@ void InitEncoderPages(device_2dspan chunks, max_page_size_bytes, max_page_size_rows, page_align, - write_v2_headers); + write_v2_headers, + prefer_delta_byte_array); } void EncodePages(device_span pages, @@ -3112,6 +3423,13 @@ void EncodePages(device_span pages, gpuEncodeDeltaLengthByteArrayPages <<>>(pages, comp_in, comp_out, comp_results); } + if (BitAnd(kernel_mask, encode_kernel_mask::DELTA_BYTE_ARRAY) != 0) { + auto const strm = streams[s_idx++]; + gpuEncodePageLevels<<>>( + pages, write_v2_headers, encode_kernel_mask::DELTA_BYTE_ARRAY); + gpuEncodeDeltaByteArrayPages + <<>>(pages, comp_in, comp_out, comp_results); + } if (BitAnd(kernel_mask, encode_kernel_mask::DICTIONARY) != 0) { auto const strm = streams[s_idx++]; gpuEncodePageLevels<<>>( diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index b215cd7a20b..d32fd237a22 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -504,10 +504,11 @@ constexpr uint32_t encoding_to_mask(Encoding encoding) * Used to control which encode kernels to run. */ enum class encode_kernel_mask { - PLAIN = (1 << 0), // Run plain encoding kernel - DICTIONARY = (1 << 1), // Run dictionary encoding kernel - DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel - DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + PLAIN = (1 << 0), // Run plain encoding kernel + DICTIONARY = (1 << 1), // Run dictionary encoding kernel + DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel + DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel }; /** @@ -917,6 +918,8 @@ void get_dictionary_indices(cudf::detail::device_2dspan frag * @param[in] page_grstats Setup for page-level stats * @param[in] page_align Required alignment for uncompressed pages * @param[in] write_v2_headers True if V2 page headers should be written + * @param[in] prefer_delta_byte_array True if DELTA_BYTE_ARRAY encoding should be the fallback for + * strings columns * @param[in] chunk_grstats Setup for chunk-level stats * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use @@ -931,6 +934,7 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, size_type max_page_size_rows, uint32_t page_align, bool write_v2_headers, + bool prefer_delta_byte_array, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 0303439fb27..1da4e4221b5 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1046,6 +1046,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, + bool prefer_delta_byte_array, Compression compression_codec, rmm::cuda_stream_view stream) { @@ -1063,6 +1064,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, max_page_size_rows, page_alignment(compression_codec), write_v2_headers, + prefer_delta_byte_array, nullptr, nullptr, stream); @@ -1088,6 +1090,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, max_page_size_rows, page_alignment(compression_codec), write_v2_headers, + prefer_delta_byte_array, nullptr, nullptr, stream); @@ -1114,6 +1117,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, max_page_size_rows, page_alignment(compression_codec), write_v2_headers, + prefer_delta_byte_array, nullptr, nullptr, stream); @@ -1266,6 +1270,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, size_t max_page_size_bytes, size_type max_page_size_rows, bool write_v2_headers, + bool prefer_delta_byte_array, rmm::cuda_stream_view stream) { rmm::device_uvector page_stats_mrg(num_stats_bfr, stream); @@ -1280,6 +1285,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, max_page_size_rows, page_alignment(compression), write_v2_headers, + prefer_delta_byte_array, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, stream); @@ -1533,6 +1539,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, bool int96_timestamps, bool utc_timestamps, bool write_v2_headers, + bool prefer_delta_byte_array, host_span const> out_sink, rmm::cuda_stream_view stream) { @@ -1840,14 +1847,16 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Build chunk dictionaries and count pages. Sends chunks to device. - cudf::detail::hostdevice_vector comp_page_sizes = init_page_sizes(chunks, - col_desc, - num_columns, - max_page_size_bytes, - max_page_size_rows, - write_v2_headers, - compression, - stream); + cudf::detail::hostdevice_vector comp_page_sizes = + init_page_sizes(chunks, + col_desc, + num_columns, + max_page_size_bytes, + max_page_size_rows, + write_v2_headers, + prefer_delta_byte_array, + compression, + stream); // Find which partition a rg belongs to std::vector rg_to_part; @@ -1995,6 +2004,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, max_page_size_bytes, max_page_size_rows, write_v2_headers, + prefer_delta_byte_array, stream); } @@ -2175,6 +2185,7 @@ writer::impl::impl(std::vector> sinks, _int96_timestamps(options.is_enabled_int96_timestamps()), _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), + _prefer_delta_byte_array(options.is_enabled_prefer_dba()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), _single_write_mode(mode), @@ -2204,6 +2215,7 @@ writer::impl::impl(std::vector> sinks, _int96_timestamps(options.is_enabled_int96_timestamps()), _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), + _prefer_delta_byte_array(options.is_enabled_prefer_dba()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), _single_write_mode(mode), @@ -2282,6 +2294,7 @@ void writer::impl::write(table_view const& input, std::vector co _int96_timestamps, _utc_timestamps, _write_v2_headers, + _prefer_delta_byte_array, _out_sink, _stream); } catch (...) { // catch any exception type diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 3415205d179..33fee850976 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -159,6 +159,7 @@ class writer::impl { bool const _int96_timestamps; bool const _utc_timestamps; bool const _write_v2_headers; + bool const _prefer_delta_byte_array; int32_t const _column_index_truncate_length; std::vector> const _kv_meta; // Optional user metadata. cudf::io::detail::single_write_mode const diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index abbd0c97f07..40d3e9e59c4 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2001,17 +2001,6 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) str_col, *str_col_nulls, *str_list, *str_list_nulls, big_str_col, *big_str_col_nulls, *big_str_list, *big_str_list_nulls}); - auto const filepath = temp_env->get_temp_filepath("DeltaSkipRowsWithNulls.parquet"); - auto const out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) - .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) - .compression(cudf::io::compression_type::NONE) - .dictionary_policy(cudf::io::dictionary_policy::NEVER) - .max_page_size_rows(5'000) - .write_v2_headers(true) - .build(); - cudf::io::write_parquet(out_opts); - // skip_rows / num_rows // clang-format off std::vector> params{ @@ -2026,38 +2015,56 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) }; // clang-format on - for (auto p : params) { - cudf::io::parquet_reader_options read_args = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); - if (p.first >= 0) { read_args.set_skip_rows(p.first); } - if (p.second >= 0) { read_args.set_num_rows(p.second); } - auto result = cudf::io::read_parquet(read_args); - - p.first = p.first < 0 ? 0 : p.first; - p.second = p.second < 0 ? num_rows - p.first : p.second; - std::vector slice_indices{p.first, p.first + p.second}; - std::vector expected = cudf::slice(tbl, slice_indices); - - CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected[0]); - - // test writing the result back out as a further check of the delta writer's correctness - std::vector out_buffer; - cudf::io::parquet_writer_options out_opts2 = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&out_buffer}, - result.tbl->view()) + auto const do_test = [&](bool delta_ba) { + auto const filepath = temp_env->get_temp_filepath("DeltaSkipRowsWithNulls.parquet"); + auto const out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) .compression(cudf::io::compression_type::NONE) .dictionary_policy(cudf::io::dictionary_policy::NEVER) .max_page_size_rows(5'000) - .write_v2_headers(true); - cudf::io::write_parquet(out_opts2); + .write_v2_headers(true) + .prefer_dba(delta_ba) + .build(); + cudf::io::write_parquet(out_opts); - cudf::io::parquet_reader_options default_in_opts = cudf::io::parquet_reader_options::builder( - cudf::io::source_info{out_buffer.data(), out_buffer.size()}); - auto const result2 = cudf::io::read_parquet(default_in_opts); + for (auto p : params) { + cudf::io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf::io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? num_rows - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + std::vector expected = cudf::slice(tbl, slice_indices); + + CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected[0]); + + // test writing the result back out as a further check of the delta writer's correctness + std::vector out_buffer; + cudf::io::parquet_writer_options out_opts2 = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&out_buffer}, + result.tbl->view()) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .compression(cudf::io::compression_type::NONE) + .dictionary_policy(cudf::io::dictionary_policy::NEVER) + .max_page_size_rows(5'000) + .prefer_dba(delta_ba) + .write_v2_headers(true); + cudf::io::write_parquet(out_opts2); + + cudf::io::parquet_reader_options default_in_opts = cudf::io::parquet_reader_options::builder( + cudf::io::source_info{out_buffer.data(), out_buffer.size()}); + auto const result2 = cudf::io::read_parquet(default_in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), result2.tbl->view()); + } + }; - CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), result2.tbl->view()); - } + do_test(false); + do_test(true); } /////////////////// diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index cdd1bde0274..4604fdcf66c 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. from libc.stdint cimport uint8_t from libcpp cimport bool @@ -101,6 +101,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_max_page_size_bytes(size_t val) except + void set_max_page_size_rows(size_type val) except + void enable_write_v2_headers(bool val) except + + void enable_prefer_dba(bool val) except + void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except + @staticmethod @@ -155,6 +156,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& write_v2_headers( bool val ) except + + parquet_writer_options_builder& prefer_dba( + bool val + ) except + parquet_writer_options_builder& dictionary_policy( cudf_io_types.dictionary_policy val ) except + @@ -200,6 +204,7 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_max_page_size_bytes(size_t val) except + void set_max_page_size_rows(size_type val) except + void enable_write_v2_headers(bool val) except + + void enable_prefer_dba(bool val) except + void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except + @staticmethod @@ -245,6 +250,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& write_v2_headers( bool val ) except + + parquet_writer_options_builder& prefer_dba( + bool val + ) except + parquet_writer_options_builder& dictionary_policy( cudf_io_types.dictionary_policy val ) except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index fab7d76c3c2..b7d34409745 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -324,6 +324,7 @@ def write_parquet( object force_nullable_schema=False, header_version="1.0", use_dictionary=True, + prefer_delta_byte_array=False, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -421,6 +422,7 @@ def write_parquet( .stats_level(stat_freq) .int96_timestamps(_int96_timestamps) .write_v2_headers(header_version == "2.0") + .prefer_dba(prefer_delta_byte_array) .dictionary_policy(dict_policy) .utc_timestamps(False) .build() diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 1b0f83c5d70..650936a48cc 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6611,6 +6611,7 @@ def to_parquet( return_metadata=False, use_dictionary=True, header_version="1.0", + prefer_delta_byte_array=False, *args, **kwargs, ): @@ -6637,6 +6638,7 @@ def to_parquet( return_metadata=return_metadata, use_dictionary=use_dictionary, header_version=header_version, + prefer_delta_byte_array=prefer_delta_byte_array, *args, **kwargs, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 6c70b08384f..0e175c35b3d 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -68,6 +68,7 @@ def _write_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + prefer_delta_byte_array=False, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -100,6 +101,7 @@ def _write_parquet( "force_nullable_schema": force_nullable_schema, "header_version": header_version, "use_dictionary": use_dictionary, + "prefer_delta_byte_array": prefer_delta_byte_array, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -875,6 +877,7 @@ def to_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + prefer_delta_byte_array=False, *args, **kwargs, ): @@ -951,6 +954,7 @@ def to_parquet( force_nullable_schema=force_nullable_schema, header_version=header_version, use_dictionary=use_dictionary, + prefer_delta_byte_array=prefer_delta_byte_array, ) else: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index b4e24bd1617..ea7f2fea3af 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1299,8 +1299,19 @@ def test_parquet_delta_byte_array(datadir): assert_eq(cudf.read_parquet(fname), pd.read_parquet(fname)) +# Chosen to exercise different edge cases. +# 1 - single row, value in header +# 2 - one value in header, one in miniblock +# 23 - partial miniblock +# 32 - one less than full miniblock +# 33 - one full miniblock +# 34 - start of next miniblock +# 128 - one less than full block +# 129 - one full block +# 130 - start of next block +# 500 - multiple blocks def delta_num_rows(): - return [1, 2, 23, 32, 33, 34, 64, 65, 66, 128, 129, 130, 20000, 50000] + return [1, 2, 23, 32, 33, 34, 128, 129, 130, 500] @pytest.mark.parametrize("nrows", delta_num_rows()) @@ -1370,6 +1381,7 @@ def test_delta_byte_array_roundtrip( nrows, add_nulls, max_string_length, str_encoding, tmpdir ): null_frequency = 0.25 if add_nulls else 0 + prefer_dba = True if str_encoding == "DELTA_BYTE_ARRAY" else False # Create a pandas dataframe with random data of mixed lengths test_pdf = dg.rand_dataframe( @@ -1400,17 +1412,17 @@ def test_delta_byte_array_roundtrip( pcdf = cudf.from_pandas(test_pdf) assert_eq(cdf, pcdf) - # Test DELTA_LENGTH_BYTE_ARRAY writing as well - if str_encoding == "DELTA_LENGTH_BYTE_ARRAY": - cudf_fname = tmpdir.join("cdfdeltaba.parquet") - pcdf.to_parquet( - cudf_fname, - compression="snappy", - header_version="2.0", - use_dictionary=False, - ) - cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) - assert_eq(cdf2, cdf) + # Test writing as well + cudf_fname = tmpdir.join("cdfdeltaba.parquet") + pcdf.to_parquet( + cudf_fname, + compression="snappy", + header_version="2.0", + use_dictionary=False, + prefer_delta_byte_array=prefer_dba, + ) + cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) + assert_eq(cdf2, cdf) @pytest.mark.parametrize("nrows", delta_num_rows()) @@ -1424,6 +1436,7 @@ def test_delta_struct_list(tmpdir, nrows, add_nulls, str_encoding): list_size = 4 num_rows = nrows include_validity = add_nulls + prefer_dba = True if str_encoding == "DELTA_BYTE_ARRAY" else False def list_gen_wrapped(x, y): return list_row_gen( @@ -1467,17 +1480,17 @@ def string_list_gen_wrapped(x, y): pcdf = cudf.from_pandas(test_pdf) assert_eq(cdf, pcdf) - # Test DELTA_LENGTH_BYTE_ARRAY writing as well - if str_encoding == "DELTA_LENGTH_BYTE_ARRAY": - cudf_fname = tmpdir.join("cdfdeltaba.parquet") - pcdf.to_parquet( - cudf_fname, - compression="snappy", - header_version="2.0", - use_dictionary=False, - ) - cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) - assert_eq(cdf2, cdf) + # Test writing as well + cudf_fname = tmpdir.join("cdfdeltaba.parquet") + pcdf.to_parquet( + cudf_fname, + compression="snappy", + header_version="2.0", + use_dictionary=False, + prefer_delta_byte_array=prefer_dba, + ) + cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) + assert_eq(cdf2, cdf) @pytest.mark.parametrize( diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index feb02bac60d..16ba96e1029 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -296,6 +296,9 @@ Controls whether to use version 1.0 or version 2.0 page headers when encoding. Version 1.0 is more portable, but version 2.0 enables the use of newer encoding schemes. +prefer_delta_byte_array : bool, default False + When ``True``, DELTA_BYTE_ARRAY encoding will be used where normally + DELTA_LENGTH_BYTE_ARRAY encoding would be used. force_nullable_schema : bool, default False. If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, From 6b1886bd60a2749bc400213fdf9586c7817384a3 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 31 Jan 2024 11:16:13 -0800 Subject: [PATCH 02/17] fix decl of gpuEncodeDeltaByteArrayPages --- cpp/src/io/parquet/page_enc.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index e80ad9f92fd..8c4d4fd0072 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -2166,7 +2166,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // DELTA_BYTE_ARRAY page data encoder // blockDim(128, 1, 1) template -__global__ void __launch_bounds__(block_size, 8) +CUDF_KERNEL void __launch_bounds__(block_size, 8) gpuEncodeDeltaByteArrayPages(device_span pages, device_span> comp_in, device_span> comp_out, From dc7b0e9b96de5003a6fe5220db06847fadec1017 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 31 Jan 2024 21:33:03 -0800 Subject: [PATCH 03/17] fix bug in delta binary encoder. wasn't handling long runs of nulls at the beginning of a page correctly --- cpp/src/io/parquet/delta_enc.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/delta_enc.cuh b/cpp/src/io/parquet/delta_enc.cuh index f90d364f5eb..3e5386c81c4 100644 --- a/cpp/src/io/parquet/delta_enc.cuh +++ b/cpp/src/io/parquet/delta_enc.cuh @@ -201,7 +201,7 @@ class delta_binary_packer { if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; } __syncthreads(); - if (threadIdx.x == 0) { + if (num_valid > 0 && threadIdx.x == 0) { _values_in_buffer += num_valid; // if first pass write header if (_current_idx == 0) { From 7579b643723b6047cf5fd8b4d76d3c56fc6011c6 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 31 Jan 2024 21:40:50 -0800 Subject: [PATCH 04/17] suggestion from review Co-authored-by: Vukasin Milovanovic --- cpp/include/cudf/io/parquet.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 3e870932c24..805e2723589 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -1164,7 +1164,7 @@ class parquet_writer_options_builder { * @brief Set to true if DELTA_BYTE_ARRAY encoding should be used. * * The default encoding for all columns is dictionary encoding. When dictionary encoding - * cannot be used (it was disabled, the dictionary is too large), the parquet writer + * cannot be used (it was disabled, or the dictionary is too large), the parquet writer * will usually fall back to PLAIN encoding. If V2 headers are enabled, however, the * choice for fall back is DELTA_LENGTH_BYTE_ARRAY. Setting this to `true` will use * DELTA_BYTE_ARRAY encoding instead. This will apply to all string columns. From 4812132794a1371b51b9d2f1bf0e1759c89fbab7 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 31 Jan 2024 21:44:21 -0800 Subject: [PATCH 05/17] make it pythonic Co-authored-by: Vukasin Milovanovic --- python/cudf/cudf/tests/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index ea7f2fea3af..23eb4260a19 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1436,7 +1436,7 @@ def test_delta_struct_list(tmpdir, nrows, add_nulls, str_encoding): list_size = 4 num_rows = nrows include_validity = add_nulls - prefer_dba = True if str_encoding == "DELTA_BYTE_ARRAY" else False + prefer_dba = str_encoding == "DELTA_BYTE_ARRAY" def list_gen_wrapped(x, y): return list_row_gen( From 07d1af79d8857ae4f60bdafae8938bb0a8c1d1df Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 31 Jan 2024 21:48:59 -0800 Subject: [PATCH 06/17] change variable per suggestion --- cpp/src/io/parquet/delta_enc.cuh | 2 +- cpp/tests/io/parquet_reader_test.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/delta_enc.cuh b/cpp/src/io/parquet/delta_enc.cuh index 3e5386c81c4..49f4ccedbf0 100644 --- a/cpp/src/io/parquet/delta_enc.cuh +++ b/cpp/src/io/parquet/delta_enc.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 40d3e9e59c4..f86323ea646 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2015,7 +2015,7 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) }; // clang-format on - auto const do_test = [&](bool delta_ba) { + auto const do_test = [&](bool prefer_dba) { auto const filepath = temp_env->get_temp_filepath("DeltaSkipRowsWithNulls.parquet"); auto const out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) @@ -2024,7 +2024,7 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) .dictionary_policy(cudf::io::dictionary_policy::NEVER) .max_page_size_rows(5'000) .write_v2_headers(true) - .prefer_dba(delta_ba) + .prefer_dba(prefer_dba) .build(); cudf::io::write_parquet(out_opts); From 985d50aa12a26815512230d0733d5ae97517dae3 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 31 Jan 2024 21:54:28 -0800 Subject: [PATCH 07/17] more review changes --- cpp/src/io/parquet/writer_impl.cu | 19 +++++++++---------- cpp/tests/io/parquet_reader_test.cpp | 2 +- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 1da4e4221b5..104598873e2 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1847,16 +1847,15 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, } // Build chunk dictionaries and count pages. Sends chunks to device. - cudf::detail::hostdevice_vector comp_page_sizes = - init_page_sizes(chunks, - col_desc, - num_columns, - max_page_size_bytes, - max_page_size_rows, - write_v2_headers, - prefer_delta_byte_array, - compression, - stream); + auto comp_page_sizes = init_page_sizes(chunks, + col_desc, + num_columns, + max_page_size_bytes, + max_page_size_rows, + write_v2_headers, + prefer_delta_byte_array, + compression, + stream); // Find which partition a rg belongs to std::vector rg_to_part; diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index f86323ea646..d93c585e018 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2051,7 +2051,7 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) .compression(cudf::io::compression_type::NONE) .dictionary_policy(cudf::io::dictionary_policy::NEVER) .max_page_size_rows(5'000) - .prefer_dba(delta_ba) + .prefer_dba(prefer_dba) .write_v2_headers(true); cudf::io::write_parquet(out_opts2); From 77e378919ddd38c7db104a5081460d099888907a Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 31 Jan 2024 22:01:29 -0800 Subject: [PATCH 08/17] change another variable name --- cpp/src/io/parquet/page_enc.cu | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 8c4d4fd0072..9bdeff89c6d 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -522,7 +522,8 @@ __device__ size_t delta_data_len(Type physical_type, // divisible by 128 (via static assert on delta::block_size), but do safe division anyway. auto const bytes_per_block = cudf::util::div_rounding_up_unsafe(max_bits * vals_per_block, 8); auto const block_size = mini_block_header_size + bytes_per_block; - auto const block_mult = physical_type == BYTE_ARRAY and prefer_delta_byte_array ? 2 : 1; + // the number of DELTA_BINARY_PACKED blocks to encode + auto const num_dbp_blocks = physical_type == BYTE_ARRAY and prefer_delta_byte_array ? 2 : 1; // delta header is 2 bytes for the block_size, 1 byte for number of mini-blocks, // max 5 bytes for number of values, and max dtype_len + 1 for first value. @@ -534,11 +535,11 @@ __device__ size_t delta_data_len(Type physical_type, // data we also need to add size of the char data. `page_size` that is passed in is the // plain encoded size (i.e. num_values * sizeof(size_type) + char_data_len), so the char // data len is `page_size` minus the first term. - // `block_mult` takes into account the two delta binary blocks for DELTA_BYTE_ARRAY. + // `num_dbp_blocks` takes into account the two delta binary blocks for DELTA_BYTE_ARRAY. auto const char_data_len = physical_type == BYTE_ARRAY ? page_size - num_values * sizeof(size_type) : 0; - return header_size + num_blocks * block_mult * block_size + char_data_len; + return header_size + num_blocks * num_dbp_blocks * block_size + char_data_len; } // blockDim {128,1,1} From 388ae86d762ef6c0e8aecee2ee89636a575606aa Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 31 Jan 2024 22:07:02 -0800 Subject: [PATCH 09/17] add explanation of when to choose delta_byte_array --- python/cudf/cudf/utils/ioutils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 16ba96e1029..63c3cf53463 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -298,7 +298,9 @@ use of newer encoding schemes. prefer_delta_byte_array : bool, default False When ``True``, DELTA_BYTE_ARRAY encoding will be used where normally - DELTA_LENGTH_BYTE_ARRAY encoding would be used. + DELTA_LENGTH_BYTE_ARRAY encoding would be used. DELTA_BYTE_ARRAY may be + preferable when dealing with sorted data, or data with many runs of the + same value. force_nullable_schema : bool, default False. If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, From 6b2ef14334b0400f4d127b13b0db5a04efbbde6a Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 1 Feb 2024 08:22:18 -0800 Subject: [PATCH 10/17] fix the other bool assignment --- python/cudf/cudf/tests/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 23eb4260a19..29bf8a2b1a8 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1381,7 +1381,7 @@ def test_delta_byte_array_roundtrip( nrows, add_nulls, max_string_length, str_encoding, tmpdir ): null_frequency = 0.25 if add_nulls else 0 - prefer_dba = True if str_encoding == "DELTA_BYTE_ARRAY" else False + prefer_dba = str_encoding == "DELTA_BYTE_ARRAY" # Create a pandas dataframe with random data of mixed lengths test_pdf = dg.rand_dataframe( From 46c5425a916568bee5fd67913b1ad1d8f49d6883 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 1 Feb 2024 15:36:06 -0800 Subject: [PATCH 11/17] add delta binary test --- cpp/tests/io/parquet_writer_test.cpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 34061cb7bf8..62a24bf0a73 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1426,6 +1426,32 @@ TEST_F(ParquetWriterTest, RowGroupMetadata) static_cast(num_rows * sizeof(column_type))); } +TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) +{ + // test that the DELTA_BINARY_PACKED writer can properly encode a column that begins with + // more than 129 nulls + constexpr int num_rows = 500; + constexpr int num_nulls = 150; + + auto const ones = thrust::make_constant_iterator(1); + auto valids = cudf::detail::make_counting_transform_iterator( + 0, [num_nulls](auto i) { return i >= num_nulls; }); + auto const col = cudf::test::fixed_width_column_wrapper{ones, ones + num_rows, valids}; + auto const expected = table_view({col}); + + auto const filepath = temp_env->get_temp_filepath("DeltaBinaryStartsWithNulls.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(true) + .dictionary_policy(cudf::io::dictionary_policy::NEVER); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template From b955c378e0e4de7c5acb24d3b659b6de10bd4363 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 1 Feb 2024 16:02:44 -0800 Subject: [PATCH 12/17] use struct rather than tuple for byte arrays --- cpp/src/io/parquet/page_enc.cu | 59 ++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 9bdeff89c6d..247d6d01d08 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -2164,6 +2164,22 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s, output_ptr + string_data_len, pages, comp_in, comp_out, comp_results, true); } +struct byte_array { + uint8_t const* data; + size_type length; + + // calculate the amount of overlap with a preceding array + __device__ size_type overlap_with(byte_array const& preceding) const + { + auto const max_pref_len = min(length, preceding.length); + size_type idx = 0; + while (idx < max_pref_len and data[idx] == preceding.data[idx]) { + idx++; + } + return idx; + } +}; + // DELTA_BYTE_ARRAY page data encoder // blockDim(128, 1, 1) template @@ -2275,16 +2291,16 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const type_id = s->col.leaf_column->type().id(); - auto const get_string_tuple = [type_id, s](int idx) -> thrust::pair { + auto const get_byte_array = [type_id, s](size_type idx) -> byte_array { if (type_id == type_id::STRING) { auto const str = s->col.leaf_column->element(idx); - return {str.size_bytes(), reinterpret_cast(str.data())}; + return {reinterpret_cast(str.data()), str.size_bytes()}; } else if (s->col.output_as_byte_array && type_id == type_id::LIST) { auto const str = get_element(*s->col.leaf_column, idx); - return {static_cast(str.size_bytes()), - reinterpret_cast(str.data())}; + return {reinterpret_cast(str.data()), + static_cast(str.size_bytes())}; } - return {0, nullptr}; + return {nullptr, 0}; }; /* @@ -2325,16 +2341,11 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) size_type const pleaf_idx = has_leaf_nulls ? offsets_map[idx - 1] : idx - 1; // get this string and the preceding string - auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); - auto const [len_p, ptr_p] = get_string_tuple(pleaf_idx + s->page_start_val); + auto const current = get_byte_array(leaf_idx + s->page_start_val); + auto const preceding = get_byte_array(pleaf_idx + s->page_start_val); // calculate the amount of overlap - auto const max_pref_len = min(len, len_p); - size_type pref_len = 0; - while (pref_len < max_pref_len and ptr[pref_len] == ptr_p[pref_len]) { - pref_len++; - } - prefix_lengths[idx] = pref_len; + prefix_lengths[idx] = current.overlap_with(preceding); } // encode prefix lengths @@ -2370,8 +2381,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) int32_t val = 0; if (in_range) { size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; - auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); - val = len - prefix_lengths[t_idx]; + auto const byte_arr = get_byte_array(leaf_idx + s->page_start_val); + val = byte_arr.length - prefix_lengths[t_idx]; if (val > 0) { non_zero++; suffix_bytes += val; @@ -2401,12 +2412,12 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) size_type s_len = 0, pref_len = 0, suff_len = 0; uint8_t const* s_ptr = nullptr; if (t_idx < s->page.num_valid) { - int const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; - auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); - s_len = len; - s_ptr = ptr; - pref_len = prefix_lengths[t_idx]; - suff_len = len - pref_len; + size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; + auto const byte_arr = get_byte_array(leaf_idx + s->page_start_val); + s_len = byte_arr.length; + s_ptr = byte_arr.data; + pref_len = prefix_lengths[t_idx]; + suff_len = byte_arr.length - pref_len; } // calculate offsets into output @@ -2430,15 +2441,15 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // fetch string for this iter size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; - auto const [len, ptr] = get_string_tuple(leaf_idx + s->page_start_val); + auto const byte_arr = get_byte_array(leaf_idx + s->page_start_val); size_type const pref_len = prefix_lengths[idx]; - size_type const suff_len = len - pref_len; + size_type const suff_len = byte_arr.length - pref_len; // now copy the data auto const dst = strings_ptr + str_data_len; for (int i = 0; i < suff_len; i += block_size) { size_type const src_idx = i + mytid; - if (src_idx < suff_len) { dst[src_idx] = ptr[pref_len + src_idx]; } + if (src_idx < suff_len) { dst[src_idx] = byte_arr.data[pref_len + src_idx]; } } str_data_len += suff_len; From 858b4c977b20764342e52b2f91de371eb1a1f115 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 2 Feb 2024 13:38:22 -0800 Subject: [PATCH 13/17] fix bug in delta_byte_array reader --- cpp/src/io/parquet/page_string_decode.cu | 3 +++ cpp/tests/io/parquet_reader_test.cpp | 30 ++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index d652a43d097..5cd8205b4ba 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -535,6 +535,9 @@ __device__ thrust::pair totalDeltaByteArraySize(uint8_t const* d uint32_t const idx = db->current_value_idx + i + lane_id; if (idx >= start_value && idx < end_value && idx < db->value_count) { lane_sum += db->value[rolling_index(idx)]; + } + // need lane_max over all values, not just in bounds + if (idx < db->value_count) { lane_max = max(lane_max, db->value[rolling_index(idx)]); } } diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index d93c585e018..ba0ce55be16 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2067,6 +2067,36 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) do_test(true); } +TEST_F(ParquetReaderTest, DeltaByteArraySkipAllValid) +{ + // test that the DELTA_BYTE_ARRAY decoder can handle the case where skip rows skips all valid + // values in a page + constexpr int num_rows = 500; + constexpr int num_valid = 150; + + auto const ones = thrust::make_constant_iterator("one"); + + auto valids = cudf::detail::make_counting_transform_iterator( + 0, [num_valid](auto i) { return i < num_valid; }); + auto const col = cudf::test::strings_column_wrapper{ones, ones + num_rows, valids}; + auto const expected = table_view({col}); + + auto const filepath = temp_env->get_temp_filepath("DeltaByteArraySkipAllValid.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(true) + .prefer_dba(true) + .dictionary_policy(cudf::io::dictionary_policy::NEVER); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .skip_rows(num_valid + 1); + auto result = cudf::io::read_parquet(in_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::slice(expected, {num_valid + 1, num_rows}), + result.tbl->view()); +} + /////////////////// // metadata tests From cf30cae3bb21bbe12e0e2fb284a8c89ce1feeab6 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 2 Feb 2024 14:41:27 -0800 Subject: [PATCH 14/17] Apply suggestions from code review Co-authored-by: Vukasin Milovanovic --- cpp/src/io/parquet/page_enc.cu | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 247d6d01d08..d6eb65266e5 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -2436,8 +2436,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) int t0 = 0; // thread 0 for each string for (int idx = 0; idx < s->page.num_valid; idx++) { // calculate ids for this string - int mytid = t - t0; - if (mytid < 0) { mytid += block_size; } + int const mytid = (t - t0 + block_size) % block_size; // fetch string for this iter size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; @@ -2447,9 +2446,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // now copy the data auto const dst = strings_ptr + str_data_len; - for (int i = 0; i < suff_len; i += block_size) { - size_type const src_idx = i + mytid; - if (src_idx < suff_len) { dst[src_idx] = byte_arr.data[pref_len + src_idx]; } + for (int src_idx = mytid; src_idx < suff_len; src_idx += block_size) { + dst[src_idx] = byte_arr.data[pref_len + src_idx]; } str_data_len += suff_len; From ad873c660add198588b950a68bfa92a7c79516e4 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 2 Feb 2024 14:42:08 -0800 Subject: [PATCH 15/17] more suggestions --- cpp/src/io/parquet/page_enc.cu | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d6eb65266e5..5a163298106 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -2291,7 +2291,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const type_id = s->col.leaf_column->type().id(); - auto const get_byte_array = [type_id, s](size_type idx) -> byte_array { + auto const byte_array_at = [type_id, s](size_type idx) -> byte_array { if (type_id == type_id::STRING) { auto const str = s->col.leaf_column->element(idx); return {reinterpret_cast(str.data()), str.size_bytes()}; @@ -2341,8 +2341,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) size_type const pleaf_idx = has_leaf_nulls ? offsets_map[idx - 1] : idx - 1; // get this string and the preceding string - auto const current = get_byte_array(leaf_idx + s->page_start_val); - auto const preceding = get_byte_array(pleaf_idx + s->page_start_val); + auto const current = byte_array_at(leaf_idx + s->page_start_val); + auto const preceding = byte_array_at(pleaf_idx + s->page_start_val); // calculate the amount of overlap prefix_lengths[idx] = current.overlap_with(preceding); @@ -2381,7 +2381,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) int32_t val = 0; if (in_range) { size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; - auto const byte_arr = get_byte_array(leaf_idx + s->page_start_val); + auto const byte_arr = byte_array_at(leaf_idx + s->page_start_val); val = byte_arr.length - prefix_lengths[t_idx]; if (val > 0) { non_zero++; @@ -2413,7 +2413,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) uint8_t const* s_ptr = nullptr; if (t_idx < s->page.num_valid) { size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; - auto const byte_arr = get_byte_array(leaf_idx + s->page_start_val); + auto const byte_arr = byte_array_at(leaf_idx + s->page_start_val); s_len = byte_arr.length; s_ptr = byte_arr.data; pref_len = prefix_lengths[t_idx]; @@ -2440,7 +2440,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // fetch string for this iter size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; - auto const byte_arr = get_byte_array(leaf_idx + s->page_start_val); + auto const byte_arr = byte_array_at(leaf_idx + s->page_start_val); size_type const pref_len = prefix_lengths[idx]; size_type const suff_len = byte_arr.length - pref_len; From 9883af74932eb7dbbb21b6917eb0d1e2331d6261 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 2 Feb 2024 14:54:11 -0800 Subject: [PATCH 16/17] a few more cleanups --- cpp/src/io/parquet/page_enc.cu | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 5a163298106..bd170ef14fc 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -2436,7 +2436,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) int t0 = 0; // thread 0 for each string for (int idx = 0; idx < s->page.num_valid; idx++) { // calculate ids for this string - int const mytid = (t - t0 + block_size) % block_size; + int const tid = (t - t0 + block_size) % block_size; // fetch string for this iter size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; @@ -2446,13 +2446,12 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // now copy the data auto const dst = strings_ptr + str_data_len; - for (int src_idx = mytid; src_idx < suff_len; src_idx += block_size) { + for (int src_idx = tid; src_idx < suff_len; src_idx += block_size) { dst[src_idx] = byte_arr.data[pref_len + src_idx]; } str_data_len += suff_len; - t0 += suff_len; - t0 = t0 % block_size; + t0 = (t0 + suff_len) % block_size; } } From 81f4ee2b10c081760319ad4a85399ea06eceb804 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 2 Feb 2024 15:12:48 -0800 Subject: [PATCH 17/17] lost a change somehow --- cpp/src/io/parquet/page_enc.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index bd170ef14fc..64d898e432a 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -2169,7 +2169,7 @@ struct byte_array { size_type length; // calculate the amount of overlap with a preceding array - __device__ size_type overlap_with(byte_array const& preceding) const + __device__ size_type common_prefix_length(byte_array const& preceding) const { auto const max_pref_len = min(length, preceding.length); size_type idx = 0; @@ -2345,7 +2345,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) auto const preceding = byte_array_at(pleaf_idx + s->page_start_val); // calculate the amount of overlap - prefix_lengths[idx] = current.overlap_with(preceding); + prefix_lengths[idx] = current.common_prefix_length(preceding); } // encode prefix lengths