From 283f18ba71c8cafca61422ac706c751615de5bad Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 6 Mar 2024 08:37:50 -0800 Subject: [PATCH 1/3] add DELTA_BYTE_ARRAY encoder --- cpp/src/io/parquet/page_delta_decode.cu | 16 + cpp/src/io/parquet/page_enc.cu | 403 +++++++++++++++++++++--- cpp/src/io/parquet/parquet_gpu.hpp | 9 +- cpp/src/io/parquet/writer_impl.cu | 26 +- cpp/tests/io/parquet_reader_test.cpp | 54 ++++ cpp/tests/io/parquet_writer_test.cpp | 47 ++- 6 files changed, 482 insertions(+), 73 deletions(-) diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index c68b6a32c8b..7c0092c6185 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -462,6 +462,14 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) return; } + if (s->col.logical_type.has_value() && s->col.logical_type->type == LogicalType::DECIMAL) { + // we cannot read decimal encoded with DELTA_BYTE_ARRAY yet + if (t == 0) { + set_error(static_cast(decode_error::INVALID_DATA_TYPE), error_code); + } + return; + } + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; // choose a character parallel string copy when the average string is longer than a warp @@ -620,6 +628,14 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) return; } + if (s->col.logical_type.has_value() && s->col.logical_type->type == LogicalType::DECIMAL) { + // we cannot read decimal encoded with DELTA_LENGTH_BYTE_ARRAY yet + if (t == 0) { + set_error(static_cast(decode_error::INVALID_DATA_TYPE), error_code); + } + return; + } + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; // copying logic from gpuDecodePageData. diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 617cb1d0992..fb17545875a 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -16,6 +16,7 @@ #include "delta_enc.cuh" #include "io/utilities/block_utils.cuh" +#include "page_string_utils.cuh" #include "parquet_gpu.cuh" #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -493,10 +495,47 @@ CUDF_KERNEL void __launch_bounds__(128) } } +// given a column chunk, determine which data encoding to use +__device__ encode_kernel_mask data_encoding_for_col(EncColumnChunk const* chunk, + parquet_column_device_view const* col_desc, + bool write_v2_headers) +{ + // first check for dictionary (boolean always uses dict encoder) + if (chunk->use_dictionary or col_desc->physical_type == BOOLEAN) { + return encode_kernel_mask::DICTIONARY; + } + + // next check for user requested encoding, but skip if user requested dictionary encoding + // (if we could use the requested dict encoding, we'd have returned above) + if (col_desc->requested_encoding != column_encoding::USE_DEFAULT and + col_desc->requested_encoding != column_encoding::DICTIONARY) { + switch (col_desc->requested_encoding) { + case column_encoding::PLAIN: return encode_kernel_mask::PLAIN; + case column_encoding::DELTA_BINARY_PACKED: return encode_kernel_mask::DELTA_BINARY; + case column_encoding::DELTA_LENGTH_BYTE_ARRAY: return encode_kernel_mask::DELTA_LENGTH_BA; + case column_encoding::DELTA_BYTE_ARRAY: return encode_kernel_mask::DELTA_BYTE_ARRAY; + } + } + + // Select a fallback encoding. For V1, we always choose PLAIN. For V2 we'll use + // DELTA_BINARY_PACKED for INT32 and INT64, and DELTA_LENGTH_BYTE_ARRAY for + // BYTE_ARRAY. Everything else will still fall back to PLAIN. + if (write_v2_headers) { + switch (col_desc->physical_type) { + case INT32: + case INT64: return encode_kernel_mask::DELTA_BINARY; + case BYTE_ARRAY: return encode_kernel_mask::DELTA_LENGTH_BA; + } + } + + return encode_kernel_mask::PLAIN; +} + __device__ size_t delta_data_len(Type physical_type, cudf::type_id type_id, uint32_t num_values, - size_t page_size) + size_t page_size, + encode_kernel_mask encoding) { auto const dtype_len_out = physical_type_len(physical_type, type_id); auto const dtype_len = [&]() -> uint32_t { @@ -516,6 +555,8 @@ __device__ size_t delta_data_len(Type physical_type, // divisible by 128 (via static assert on delta::block_size), but do safe division anyway. auto const bytes_per_block = cudf::util::div_rounding_up_unsafe(max_bits * vals_per_block, 8); auto const block_size = mini_block_header_size + bytes_per_block; + // the number of DELTA_BINARY_PACKED blocks to encode + auto const num_dbp_blocks = encoding == encode_kernel_mask::DELTA_BYTE_ARRAY ? 2 : 1; // delta header is 2 bytes for the block_size, 1 byte for number of mini-blocks, // max 5 bytes for number of values, and max dtype_len + 1 for first value. @@ -526,12 +567,17 @@ __device__ size_t delta_data_len(Type physical_type, // The above is just a size estimate for a DELTA_BINARY_PACKED data page. For BYTE_ARRAY // data we also need to add size of the char data. `page_size` that is passed in is the // plain encoded size (i.e. num_values * sizeof(size_type) + char_data_len), so the char - // data len is `page_size` minus the first term. - // TODO: this will need to change for DELTA_BYTE_ARRAY encoding - auto const char_data_len = - physical_type == BYTE_ARRAY ? page_size - num_values * sizeof(size_type) : 0; + // data len is `page_size` minus the first term. For FIXED_LEN_BYTE_ARRAY there are no + // lengths, so just use `page_size`. + // `num_dbp_blocks` takes into account the two delta binary blocks for DELTA_BYTE_ARRAY. + size_t char_data_len = 0; + if (physical_type == BYTE_ARRAY) { + char_data_len = page_size - num_values * sizeof(size_type); + } else if (physical_type == FIXED_LEN_BYTE_ARRAY) { + char_data_len = page_size; + } - return header_size + num_blocks * block_size + char_data_len; + return header_size + num_blocks * num_dbp_blocks * block_size + char_data_len; } // blockDim {128,1,1} @@ -573,13 +619,12 @@ CUDF_KERNEL void __launch_bounds__(128) // at the worst case number of bytes needed to encode. auto const physical_type = col_g.physical_type; auto const type_id = col_g.leaf_column->type().id(); - auto const is_requested_delta = - col_g.requested_encoding == column_encoding::DELTA_BINARY_PACKED || - col_g.requested_encoding == column_encoding::DELTA_LENGTH_BYTE_ARRAY; - auto const is_fallback_to_delta = - !ck_g.use_dictionary && write_v2_headers && - (physical_type == INT32 || physical_type == INT64 || physical_type == BYTE_ARRAY); - auto const is_use_delta = is_requested_delta || is_fallback_to_delta; + + // figure out kernel encoding to use for data pages + auto const column_data_encoding = data_encoding_for_col(&ck_g, &col_g, write_v2_headers); + auto const is_use_delta = column_data_encoding == encode_kernel_mask::DELTA_BINARY or + column_data_encoding == encode_kernel_mask::DELTA_LENGTH_BA or + column_data_encoding == encode_kernel_mask::DELTA_BYTE_ARRAY; if (t < 32) { uint32_t fragments_in_chunk = 0; @@ -754,8 +799,8 @@ CUDF_KERNEL void __launch_bounds__(128) } // get a different bound if using delta encoding if (is_use_delta) { - auto const delta_len = - delta_data_len(physical_type, type_id, page_g.num_leaf_values, page_size); + auto const delta_len = delta_data_len( + physical_type, type_id, page_g.num_leaf_values, page_size, column_data_encoding); page_size = max(page_size, delta_len); } auto const max_data_size = @@ -771,11 +816,28 @@ CUDF_KERNEL void __launch_bounds__(128) // 4-byte length indicator, so subtract that. page_g.var_bytes_size = var_bytes_size; } + + page_g.kernel_mask = column_data_encoding; page_g.max_data_size = static_cast(max_data_size); pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; page_offset += util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); + // if encoding delta_byte_array, need to allocate some space for scratch data. + // if there are leaf nulls, we need space for a mapping array: + // sizeof(size_type) * num_leaf_values + // we always need prefix lengths: sizeof(size_type) * num_valid + if (page_g.kernel_mask == encode_kernel_mask::DELTA_BYTE_ARRAY) { + // scratch needs to be aligned to a size_type boundary + auto const pg_end = reinterpret_cast(ck_g.uncompressed_bfr + page_offset); + auto scratch = util::round_up_unsafe(pg_end, sizeof(size_type)); + if (page_g.num_valid != page_g.num_leaf_values) { + scratch += sizeof(size_type) * page_g.num_leaf_values; + } + scratch += sizeof(size_type) * page_g.num_valid; + page_offset = + thrust::distance(ck_g.uncompressed_bfr, reinterpret_cast(scratch)); + } if (not comp_page_sizes.empty()) { // V2 does not include level data in compressed size estimate comp_page_offset += page_g.max_hdr_size + page_g.max_lvl_size + @@ -789,43 +851,6 @@ CUDF_KERNEL void __launch_bounds__(128) __syncwarp(); if (t == 0) { if (not pages.empty()) { - // set encoding - if (col_g.requested_encoding != column_encoding::USE_DEFAULT) { - switch (col_g.requested_encoding) { - case column_encoding::PLAIN: page_g.kernel_mask = encode_kernel_mask::PLAIN; break; - case column_encoding::DICTIONARY: - // user may have requested dict, but we may not be able to use it - // TODO: when DELTA_BYTE_ARRAY is added, rework the fallback logic so there - // isn't duplicated code here and below. - if (ck_g.use_dictionary) { - page_g.kernel_mask = encode_kernel_mask::DICTIONARY; - } else if (is_fallback_to_delta) { - page_g.kernel_mask = physical_type == BYTE_ARRAY - ? encode_kernel_mask::DELTA_LENGTH_BA - : encode_kernel_mask::DELTA_BINARY; - } else { - page_g.kernel_mask = encode_kernel_mask::PLAIN; - } - break; - case column_encoding::DELTA_BINARY_PACKED: - page_g.kernel_mask = encode_kernel_mask::DELTA_BINARY; - break; - case column_encoding::DELTA_LENGTH_BYTE_ARRAY: - page_g.kernel_mask = encode_kernel_mask::DELTA_LENGTH_BA; - break; - } - } else if (is_use_delta) { - // TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA - // should always be preferred over PLAIN, but DELTA_BINARY is a different matter. - // If the delta encoding size is going to be close to 32 bits anyway, then plain - // is a better choice. - page_g.kernel_mask = physical_type == BYTE_ARRAY ? encode_kernel_mask::DELTA_LENGTH_BA - : encode_kernel_mask::DELTA_BINARY; - } else if (ck_g.use_dictionary || physical_type == BOOLEAN) { - page_g.kernel_mask = encode_kernel_mask::DICTIONARY; - } else { - page_g.kernel_mask = encode_kernel_mask::PLAIN; - } // need space for the chunk histograms plus data page histograms auto const num_histograms = num_pages - ck_g.num_dict_pages(); if (ck_g.def_histogram_data != nullptr && col_g.max_def_level > 0) { @@ -2166,6 +2191,273 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) s, output_ptr + string_data_len, pages, comp_in, comp_out, comp_results, true); } +struct byte_array { + uint8_t const* data; + size_type length; + + // calculate the amount of overlap with a preceding array + __device__ size_type common_prefix_length(byte_array const& preceding) const + { + auto const max_pref_len = min(length, preceding.length); + size_type idx = 0; + while (idx < max_pref_len and data[idx] == preceding.data[idx]) { + idx++; + } + return idx; + } +}; + +// DELTA_BYTE_ARRAY page data encoder +// blockDim(128, 1, 1) +template +CUDF_KERNEL void __launch_bounds__(block_size, 8) + gpuEncodeDeltaByteArrayPages(device_span pages, + device_span> comp_in, + device_span> comp_out, + device_span comp_results) +{ + using cudf::detail::warp_size; + // block of shared memory for value storage and bit packing + __shared__ uleb128_t delta_shared[delta::buffer_size + delta::block_size]; + __shared__ __align__(8) page_enc_state_s<0> state_g; + __shared__ delta_binary_packer packer; + __shared__ uint8_t* scratch_data; + __shared__ size_t avg_suffix_len; + using block_scan = cub::BlockScan; + using block_reduce = cub::BlockReduce; + __shared__ union { + typename block_scan::TempStorage scan_storage; + typename block_reduce::TempStorage reduce_storage; + typename delta_binary_packer::index_scan::TempStorage delta_index_tmp; + typename delta_binary_packer::block_reduce::TempStorage delta_reduce_tmp; + typename delta_binary_packer::warp_reduce::TempStorage + delta_warp_red_tmp[delta::num_mini_blocks]; + } temp_storage; + + auto* const s = &state_g; + uint32_t t = threadIdx.x; + + if (t == 0) { + state_g = page_enc_state_s<0>{}; + s->page = pages[blockIdx.x]; + s->ck = *s->page.chunk; + s->col = *s->ck.col_desc; + s->rle_len_pos = nullptr; + // get s->cur back to where it was at the end of encoding the rep and def level data + set_page_data_start(s); + } + __syncthreads(); + + if (BitAnd(s->page.kernel_mask, encode_kernel_mask::DELTA_BYTE_ARRAY) == 0) { return; } + + // Encode data values + if (t == 0) { + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = Encoding::DELTA_BYTE_ARRAY; + s->page_start_val = row_to_value_idx(s->page.start_row, s->col); + s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); + + // set pointer to beginning of scratch space (aligned to size_type boundary) + auto scratch_start = + reinterpret_cast(s->page.page_data + s->page.max_hdr_size + s->page.max_data_size); + scratch_start = util::round_up_unsafe(scratch_start, sizeof(size_type)); + scratch_data = reinterpret_cast(scratch_start); + } + __syncthreads(); + + // create offsets map (if needed) + // We only encode valid values, and we need to know adjacent valid strings. So first we'll + // create a mapping of leaf indexes to valid indexes: + // + // validity array is_valid: + // 1 1 0 1 0 1 1 0 + // + // exclusive scan on is_valid yields mapping of leaf index -> valid index: + // 0 1 2 2 3 3 4 5 + // + // Last value should equal page.num_valid. Now we need to transform that into a reverse + // lookup that maps valid index -> leaf index (of length num_valid): + // 0 1 3 5 6 + // + auto const has_leaf_nulls = s->page.num_valid != s->page.num_leaf_values; + + size_type* const offsets_map = + has_leaf_nulls ? reinterpret_cast(scratch_data) : nullptr; + + if (offsets_map != nullptr) { + size_type* const forward_map = offsets_map + s->page.num_valid; + + // create the validity array + for (int idx = t; idx < s->page.num_leaf_values; idx += block_size) { + size_type const idx_in_col = s->page_start_val + idx; + bool const is_valid = + idx_in_col < s->col.leaf_column->size() and s->col.leaf_column->is_valid(idx_in_col); + forward_map[idx] = is_valid ? 1 : 0; + } + __syncthreads(); + + // exclusive scan to get leaf_idx -> valid_idx + block_excl_sum(forward_map, s->page.num_leaf_values, 0); + + // now reverse map to get valid_idx -> leaf_idx mapping + for (int idx = t; idx < s->page.num_leaf_values; idx += block_size) { + size_type const idx_in_col = s->page_start_val + idx; + bool const is_valid = + idx_in_col < s->col.leaf_column->size() and s->col.leaf_column->is_valid(idx_in_col); + if (is_valid) { offsets_map[forward_map[idx]] = idx; } + } + __syncthreads(); + } + + size_type* const prefix_lengths = + has_leaf_nulls ? offsets_map + s->page.num_valid : reinterpret_cast(scratch_data); + + auto const type_id = s->col.leaf_column->type().id(); + + auto const byte_array_at = [type_id, s](size_type idx) -> byte_array { + if (type_id == type_id::STRING) { + auto const str = s->col.leaf_column->element(idx); + return {reinterpret_cast(str.data()), str.size_bytes()}; + } else if (s->col.output_as_byte_array && type_id == type_id::LIST) { + auto const str = get_element(*s->col.leaf_column, idx); + return {reinterpret_cast(str.data()), + static_cast(str.size_bytes())}; + } + return {nullptr, 0}; + }; + + // Calculate prefix lengths. The first prefix length is always 0. loop over num_valid since we + // only encode valid values. + // Note: calculating this on a string-per-thread basis seems bad for large strings with lots + // of overlap. But in testing, it was found that the string copy at the end had a much larger + // impact on performance, and doing this step on a string-per-warp basis was always slower. + if (t == 0) { prefix_lengths[0] = 0; } + for (int idx = t + 1; idx < s->page.num_valid; idx += block_size) { + size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; + size_type const pleaf_idx = has_leaf_nulls ? offsets_map[idx - 1] : idx - 1; + + // get this string and the preceding string + auto const current = byte_array_at(leaf_idx + s->page_start_val); + auto const preceding = byte_array_at(pleaf_idx + s->page_start_val); + + // calculate the amount of overlap + prefix_lengths[idx] = current.common_prefix_length(preceding); + } + + // encode prefix lengths + if (t == 0) { + packer.init(s->cur, s->page.num_valid, reinterpret_cast(delta_shared), &temp_storage); + } + __syncthreads(); + + // don't start at `t` because all threads must participate in each iteration + for (int idx = 0; idx < s->page.num_valid; idx += block_size) { + size_type const t_idx = idx + t; + auto const in_range = t_idx < s->page.num_valid; + auto const val = in_range ? prefix_lengths[t_idx] : 0; + packer.add_value(val, in_range); + } + + auto const suffix_ptr = packer.flush(); + __syncthreads(); + + // encode suffix lengths + if (t == 0) { + packer.init( + suffix_ptr, s->page.num_valid, reinterpret_cast(delta_shared), &temp_storage); + } + __syncthreads(); + + size_t non_zero = 0; + size_t suffix_bytes = 0; + + for (int idx = 0; idx < s->page.num_valid; idx += block_size) { + size_type const t_idx = idx + t; + auto const in_range = t_idx < s->page.num_valid; + int32_t val = 0; + if (in_range) { + size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; + auto const byte_arr = byte_array_at(leaf_idx + s->page_start_val); + val = byte_arr.length - prefix_lengths[t_idx]; + if (val > 0) { + non_zero++; + suffix_bytes += val; + } + } + packer.add_value(val, in_range); + } + + auto const strings_ptr = packer.flush(); + + non_zero = block_reduce(temp_storage.reduce_storage).Sum(non_zero); + __syncthreads(); + suffix_bytes = block_reduce(temp_storage.reduce_storage).Sum(suffix_bytes); + if (t == 0) { avg_suffix_len = util::div_rounding_up_unsafe(suffix_bytes, non_zero); } + __syncthreads(); + + // Now copy the byte array data. For shorter suffixes (<= 64 bytes), it is faster to use + // memcpy on a string-per-thread basis. For longer suffixes, it's better to use a parallel + // approach. 64 was a good cutoff in testing. + constexpr size_t suffix_cutoff = 64; + + size_t str_data_len = 0; + if (avg_suffix_len <= suffix_cutoff) { + for (int idx = 0; idx < s->page.num_valid; idx += block_size) { + size_type const t_idx = idx + t; + size_type s_len = 0, pref_len = 0, suff_len = 0; + uint8_t const* s_ptr = nullptr; + if (t_idx < s->page.num_valid) { + size_type const leaf_idx = has_leaf_nulls ? offsets_map[t_idx] : t_idx; + auto const byte_arr = byte_array_at(leaf_idx + s->page_start_val); + s_len = byte_arr.length; + s_ptr = byte_arr.data; + pref_len = prefix_lengths[t_idx]; + suff_len = byte_arr.length - pref_len; + } + + // calculate offsets into output + size_type s_off, total; + block_scan(temp_storage.scan_storage) + .ExclusiveScan(suff_len, s_off, str_data_len, cub::Sum(), total); + + if (t_idx < s->page.num_valid) { + auto const dst = strings_ptr + s_off; + memcpy(dst, s_ptr + pref_len, suff_len); + } + str_data_len += total; + __syncthreads(); + } + } else { + int t0 = 0; // thread 0 for each string + for (int idx = 0; idx < s->page.num_valid; idx++) { + // calculate ids for this string + int const tid = (t - t0 + block_size) % block_size; + + // fetch string for this iter + size_type const leaf_idx = has_leaf_nulls ? offsets_map[idx] : idx; + auto const byte_arr = byte_array_at(leaf_idx + s->page_start_val); + size_type const pref_len = prefix_lengths[idx]; + size_type const suff_len = byte_arr.length - pref_len; + + // now copy the data + auto const dst = strings_ptr + str_data_len; + for (int src_idx = tid; src_idx < suff_len; src_idx += block_size) { + dst[src_idx] = byte_arr.data[pref_len + src_idx]; + } + + str_data_len += suff_len; + t0 = (t0 + suff_len) % block_size; + } + } + + finish_page_encode( + s, strings_ptr + str_data_len, pages, comp_in, comp_out, comp_results, true); +} + constexpr int decide_compression_warps_in_block = 4; constexpr int decide_compression_block_size = decide_compression_warps_in_block * cudf::detail::warp_size; @@ -3137,6 +3429,13 @@ void EncodePages(device_span pages, gpuEncodeDeltaLengthByteArrayPages <<>>(pages, comp_in, comp_out, comp_results); } + if (BitAnd(kernel_mask, encode_kernel_mask::DELTA_BYTE_ARRAY) != 0) { + auto const strm = streams[s_idx++]; + gpuEncodePageLevels<<>>( + pages, write_v2_headers, encode_kernel_mask::DELTA_BYTE_ARRAY); + gpuEncodeDeltaByteArrayPages + <<>>(pages, comp_in, comp_out, comp_results); + } if (BitAnd(kernel_mask, encode_kernel_mask::DICTIONARY) != 0) { auto const strm = streams[s_idx++]; gpuEncodePageLevels<<>>( diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index af9f1f1267e..3e15cb7421a 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -501,10 +501,11 @@ constexpr uint32_t encoding_to_mask(Encoding encoding) * Used to control which encode kernels to run. */ enum class encode_kernel_mask { - PLAIN = (1 << 0), // Run plain encoding kernel - DICTIONARY = (1 << 1), // Run dictionary encoding kernel - DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel - DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + PLAIN = (1 << 0), // Run plain encoding kernel + DICTIONARY = (1 << 1), // Run dictionary encoding kernel + DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel + DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel }; /** diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 87c8b2f1611..5a8d96975ce 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -631,14 +631,36 @@ std::vector construct_schema_tree( "requested encoding will be ignored"); return; } + // we don't yet allow encoding decimal128 with DELTA_LENGTH_BYTE_ARRAY (nor with + // the BYTE_ARRAY physical type, but check anyway) + if (s.converted_type.value_or(ConvertedType::UNKNOWN) == ConvertedType::DECIMAL) { + CUDF_LOG_WARN( + "Decimal types cannot yet be encoded as DELTA_LENGTH_BYTE_ARRAY; the " + "requested encoding will be ignored"); + return; + } + break; + + case column_encoding::DELTA_BYTE_ARRAY: + if (s.type != Type::BYTE_ARRAY && s.type != Type::FIXED_LEN_BYTE_ARRAY) { + CUDF_LOG_WARN( + "DELTA_BYTE_ARRAY encoding is only supported for BYTE_ARRAY and " + "FIXED_LEN_BYTE_ARRAY columns; the requested encoding will be ignored"); + return; + } + // we don't yet allow encoding decimal128 with DELTA_BYTE_ARRAY + if (s.converted_type.value_or(ConvertedType::UNKNOWN) == ConvertedType::DECIMAL) { + CUDF_LOG_WARN( + "Decimal types cannot yet be encoded as DELTA_BYTE_ARRAY; the " + "requested encoding will be ignored"); + return; + } break; // supported parquet encodings case column_encoding::PLAIN: case column_encoding::DICTIONARY: break; - // not yet supported for write (soon...) - case column_encoding::DELTA_BYTE_ARRAY: [[fallthrough]]; // all others default: CUDF_LOG_WARN( diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index abbd0c97f07..fd33cca5761 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -1955,6 +1955,7 @@ TEST_F(ParquetReaderTest, RepeatedNoAnnotations) TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) { + using cudf::io::column_encoding; constexpr int num_rows = 10'000; constexpr auto seed = 21337; @@ -1999,9 +2000,29 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) int64_col, int64_nulls_col, *int64_list, *int64_list_nulls, *int16_list, *int16_list_nulls, *int8_list, *int8_list_nulls, str_col, *str_col_nulls, *str_list, *str_list_nulls, + big_str_col, *big_str_col_nulls, *big_str_list, *big_str_list_nulls, + str_col, *str_col_nulls, *str_list, *str_list_nulls, big_str_col, *big_str_col_nulls, *big_str_list, *big_str_list_nulls}); auto const filepath = temp_env->get_temp_filepath("DeltaSkipRowsWithNulls.parquet"); + auto input_metadata = cudf::io::table_input_metadata{tbl}; + input_metadata.column_metadata[12].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[13].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[14].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[15].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[16].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[17].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[18].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[19].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); + input_metadata.column_metadata[20].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[21].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[22].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[23].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[24].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[25].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[26].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + input_metadata.column_metadata[27].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + auto const out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) @@ -2060,6 +2081,39 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) } } +TEST_F(ParquetReaderTest, DeltaByteArraySkipAllValid) +{ + // test that the DELTA_BYTE_ARRAY decoder can handle the case where skip rows skips all valid + // values in a page. see #15075 + constexpr int num_rows = 500; + constexpr int num_valid = 150; + + auto const ones = thrust::make_constant_iterator("one"); + + auto valids = cudf::detail::make_counting_transform_iterator( + 0, [num_valid](auto i) { return i < num_valid; }); + auto const col = cudf::test::strings_column_wrapper{ones, ones + num_rows, valids}; + auto const expected = table_view({col}); + + auto input_metadata = cudf::io::table_input_metadata{expected}; + input_metadata.column_metadata[0].set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY); + + auto const filepath = temp_env->get_temp_filepath("DeltaByteArraySkipAllValid.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .write_v2_headers(true) + .metadata(input_metadata) + .dictionary_policy(cudf::io::dictionary_policy::NEVER); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .skip_rows(num_valid + 1); + auto result = cudf::io::read_parquet(in_opts); + CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::slice(expected, {num_valid + 1, num_rows}), + result.tbl->view()); +} + /////////////////// // metadata tests diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index f4da9f59b8c..6a40c96875e 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1482,8 +1482,18 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) auto const string_col = cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls()); - auto const table = table_view( - {col, col, col, col, col, string_col, string_col, string_col, string_col, string_col}); + auto const table = table_view({col, + col, + col, + col, + col, + col, + string_col, + string_col, + string_col, + string_col, + string_col, + string_col}); cudf::io::table_input_metadata table_metadata(table); @@ -1495,13 +1505,15 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) set_meta(1, "int_dict", column_encoding::DICTIONARY); set_meta(2, "int_db", column_encoding::DELTA_BINARY_PACKED); set_meta(3, "int_dlba", column_encoding::DELTA_LENGTH_BYTE_ARRAY); - table_metadata.column_metadata[4].set_name("int_none"); + set_meta(4, "int_dba", column_encoding::DELTA_BYTE_ARRAY); + table_metadata.column_metadata[5].set_name("int_none"); - set_meta(5, "string_plain", column_encoding::PLAIN); - set_meta(6, "string_dict", column_encoding::DICTIONARY); - set_meta(7, "string_dlba", column_encoding::DELTA_LENGTH_BYTE_ARRAY); - set_meta(8, "string_db", column_encoding::DELTA_BINARY_PACKED); - table_metadata.column_metadata[9].set_name("string_none"); + set_meta(6, "string_plain", column_encoding::PLAIN); + set_meta(7, "string_dict", column_encoding::DICTIONARY); + set_meta(8, "string_dlba", column_encoding::DELTA_LENGTH_BYTE_ARRAY); + set_meta(9, "string_dba", column_encoding::DELTA_BYTE_ARRAY); + set_meta(10, "string_db", column_encoding::DELTA_BINARY_PACKED); + table_metadata.column_metadata[11].set_name("string_none"); for (auto& col_meta : table_metadata.column_metadata) { col_meta.set_nullability(false); @@ -1534,18 +1546,23 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) expect_enc(2, Encoding::DELTA_BINARY_PACKED); // requested delta_length_byte_array, but should fall back to dictionary expect_enc(3, Encoding::PLAIN_DICTIONARY); - // no request, should fall back to dictionary + // requested delta_byte_array, but should fall back to dictionary expect_enc(4, Encoding::PLAIN_DICTIONARY); + // no request, should use dictionary + expect_enc(5, Encoding::PLAIN_DICTIONARY); + // requested plain - expect_enc(5, Encoding::PLAIN); + expect_enc(6, Encoding::PLAIN); // requested dictionary - expect_enc(6, Encoding::PLAIN_DICTIONARY); + expect_enc(7, Encoding::PLAIN_DICTIONARY); // requested delta_length_byte_array - expect_enc(7, Encoding::DELTA_LENGTH_BYTE_ARRAY); + expect_enc(8, Encoding::DELTA_LENGTH_BYTE_ARRAY); + // requested delta_byte_array + expect_enc(9, Encoding::DELTA_BYTE_ARRAY); // requested delta_binary_packed, but should fall back to dictionary - expect_enc(8, Encoding::PLAIN_DICTIONARY); - // no request, should fall back to dictionary - expect_enc(9, Encoding::PLAIN_DICTIONARY); + expect_enc(10, Encoding::PLAIN_DICTIONARY); + // no request, should use dictionary + expect_enc(11, Encoding::PLAIN_DICTIONARY); } TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) From 3c3fb9fa391b5c0f97f7ac5340bd1cee37f14139 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 6 Mar 2024 10:30:56 -0800 Subject: [PATCH 2/3] add test to make sure delta_byte_array cannot be selected for decimal data --- cpp/tests/io/parquet_writer_test.cpp | 32 ++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 6a40c96875e..200c58bb9aa 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1565,6 +1565,38 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) expect_enc(11, Encoding::PLAIN_DICTIONARY); } +TEST_F(ParquetWriterTest, Decimal128DeltaByteArray) +{ + // decimal128 in cuDF maps to FIXED_LEN_BYTE_ARRAY, which is allowed by the spec to use + // DELTA_BYTE_ARRAY encoding. But this use is not implemented in cuDF. + __int128_t val0 = 0xa1b2'c3d4'e5f6ULL; + __int128_t val1 = val0 << 80; + column_wrapper col0{{numeric::decimal128(val0, numeric::scale_type{0}), + numeric::decimal128(val1, numeric::scale_type{0})}}; + + auto expected = table_view{{col0, col0}}; + cudf::io::table_input_metadata table_metadata(expected); + table_metadata.column_metadata[0] + .set_name("decimal128") + .set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY) + .set_nullability(false); + + auto const filepath = temp_env->get_temp_filepath("Decimal128DeltaByteArray.parquet"); + const cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .compression(cudf::io::compression_type::NONE) + .metadata(table_metadata); + cudf::io::write_parquet(out_opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + // make sure DELTA_BYTE_ARRAY was not used + EXPECT_NE(fmd.row_groups[0].columns[0].meta_data.encodings[0], + cudf::io::parquet::detail::Encoding::DELTA_BYTE_ARRAY); +} + TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) { // test that the DELTA_BINARY_PACKED writer can properly encode a column that begins with From 8977acf4c04a85789f5979bee10a4c3d1bbc983e Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 6 Mar 2024 12:22:34 -0800 Subject: [PATCH 3/3] change from review comment --- cpp/tests/io/parquet_reader_test.cpp | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index fd33cca5761..52ad9cd3851 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2006,22 +2006,10 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) auto const filepath = temp_env->get_temp_filepath("DeltaSkipRowsWithNulls.parquet"); auto input_metadata = cudf::io::table_input_metadata{tbl}; - input_metadata.column_metadata[12].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[13].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[14].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[15].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[16].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[17].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[18].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[19].set_encoding(column_encoding::DELTA_LENGTH_BYTE_ARRAY); - input_metadata.column_metadata[20].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[21].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[22].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[23].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[24].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[25].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[26].set_encoding(column_encoding::DELTA_BYTE_ARRAY); - input_metadata.column_metadata[27].set_encoding(column_encoding::DELTA_BYTE_ARRAY); + for (int i = 12; i <= 27; ++i) { + input_metadata.column_metadata[i].set_encoding( + i <= 19 ? column_encoding::DELTA_LENGTH_BYTE_ARRAY : column_encoding::DELTA_BYTE_ARRAY); + } auto const out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl)