diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 65d4a4417f0..b3dea0ab280 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -113,6 +113,7 @@ enum class column_encoding { ///< valid for BYTE_ARRAY columns) DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for ///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns) + BYTE_STREAM_SPLIT, ///< Use BYTE_STREAM_SPLIT encoding (valid for all fixed width types) // ORC encodings: DIRECT, ///< Use DIRECT encoding DIRECT_V2, ///< Use DIRECT_V2 encoding diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index 945a7dcb4c6..f3332a23992 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -225,6 +225,96 @@ __device__ inline void gpuDecodeValues( } } +template +__device__ inline void gpuDecodeSplitValues(page_state_s* s, + state_buf* const sb, + int start, + int end) +{ + using cudf::detail::warp_size; + constexpr int num_warps = decode_block_size / warp_size; + constexpr int max_batch_size = num_warps * warp_size; + + auto const t = threadIdx.x; + + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + int const dtype = s->col.physical_type; + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + + // decode values + int pos = start; + while (pos < end) { + int const batch_size = min(max_batch_size, end - pos); + + int const target_pos = pos + batch_size; + int const src_pos = pos + t; + + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(src_pos)] - s->first_row; + + // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values + // before first_row) in the flat hierarchy case. + if (src_pos < target_pos && dst_pos >= 0) { + // nesting level that is storing actual leaf values + int const leaf_level_index = s->col.max_nesting_depth - 1; + + uint32_t dtype_len = s->dtype_len; + uint8_t const* src = s->data_start + src_pos; + uint8_t* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; + + // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader + if (is_decimal) { + switch (dtype) { + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case FIXED_LEN_BYTE_ARRAY: + if (s->dtype_len_in <= sizeof(int32_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(int64_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(__int128_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); + break; + } + // unsupported decimal precision + [[fallthrough]]; + + default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputByteStreamSplit(dst, src, num_values); + // zero out most significant bytes + memset(dst + 4, 0, 4); + } else if (s->ts_scale) { + gpuOutputSplitInt64Timestamp( + reinterpret_cast(dst), src, num_values, s->ts_scale); + } else { + gpuOutputByteStreamSplit(dst, src, num_values); + } + } else if (dtype_len == 4) { + gpuOutputByteStreamSplit(dst, src, num_values); + } else { + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } + + pos += batch_size; + } +} + // is the page marked nullable or not __device__ inline bool is_nullable(page_state_s* s) { @@ -495,6 +585,123 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } +/** + * @brief Kernel for computing fixed width non dictionary column data stored in the pages + * + * This function will write the page data and the page data's validity to the + * output specified in the page's column chunk. If necessary, additional + * conversion will be performed to translate from the Parquet datatype to + * desired output datatype. + * + * @param pages List of pages + * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered + */ +template +CUDF_KERNEL void __launch_bounds__(decode_block_size) + gpuDecodeSplitPageDataFlat(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) +{ + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) page_state_buffers_s // unused in this kernel + state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + PageInfo* pp = &pages[page_idx]; + + if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT))) { + return; + } + + // must come after the kernel mask check + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT}, + page_processing_stage::DECODE)) { + return; + } + + // the level stream decoders + __shared__ rle_run def_runs[rle_run_buffer_size]; + rle_stream def_decoder{def_runs}; + + // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. + if (s->num_rows == 0) { return; } + + bool const nullable = is_nullable(s); + bool const nullable_with_nulls = nullable && has_nulls(s); + + // initialize the stream decoders (requires values computed in setupLocalPageInfo) + level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); + if (nullable_with_nulls) { + def_decoder.init(s->col.level_bits[level_type::DEFINITION], + s->abs_lvl_start[level_type::DEFINITION], + s->abs_lvl_end[level_type::DEFINITION], + def, + s->page.num_input_values); + } + __syncthreads(); + + // We use two counters in the loop below: processed_count and valid_count. + // - processed_count: number of rows out of num_input_values that we have decoded so far. + // the definition stream returns the number of total rows it has processed in each call + // to decode_next and we accumulate in process_count. + // - valid_count: number of non-null rows we have decoded so far. In each iteration of the + // loop below, we look at the number of valid items (which could be all for non-nullable), + // and valid_count is that running count. + int processed_count = 0; + int valid_count = 0; + // the core loop. decode batches of level stream data using rle_stream objects + // and pass the results to gpuDecodeValues + while (s->error == 0 && processed_count < s->page.num_input_values) { + int next_valid_count; + + // only need to process definition levels if this is a nullable column + if (nullable) { + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); + } else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + } + + next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( + processed_count, s, sb, def, t, nullable_with_nulls); + } + // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip + // this function call entirely since all it will ever generate is a mapping of (i -> i) for + // nz_idx. gpuDecodeValues would be the only work that happens. + else { + processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); + next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( + processed_count, s, sb, nullptr, t, false); + } + __syncthreads(); + + // decode the values themselves + gpuDecodeSplitValues(s, sb, valid_count, next_valid_count); + __syncthreads(); + + valid_count = next_valid_count; + } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } +} + } // anonymous namespace void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, @@ -528,7 +735,7 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block // 1 full warp, and 1 warp of 1 thread dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - dim3 dim_grid(pages.size(), 1); // 1 thread block per pags => # blocks + dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { gpuDecodePageDataFixedDict<<>>( @@ -539,4 +746,24 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa } } +void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) +{ + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block + dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks + + if (level_type_size == 1) { + gpuDecodeSplitPageDataFlat<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeSplitPageDataFlat<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 62ce5b9f9a5..7207173b82f 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -28,6 +28,177 @@ namespace { constexpr int decode_block_size = 128; constexpr int rolling_buf_size = decode_block_size * 2; +/** + * @brief Kernel for computing the BYTE_STREAM_SPLIT column data stored in the pages + * + * This is basically the PLAIN decoder, but with a pared down set of supported data + * types, and using output functions that piece together the individual streams. + * Supported physical types include INT32, INT64, FLOAT, DOUBLE and FIXED_LEN_BYTE_ARRAY. + * The latter is currently only used for large decimals. The Parquet specification also + * has FLOAT16 and UUID types that are currently not supported. FIXED_LEN_BYTE_ARRAY data + * that lacks a `LogicalType` annotation will be handled by the string decoder. + * + * @param pages List of pages + * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered + */ +template +CUDF_KERNEL void __launch_bounds__(decode_block_size) + gpuDecodeSplitPageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) +{ + using cudf::detail::warp_size; + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) + page_state_buffers_s + state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int page_idx = blockIdx.x; + int t = threadIdx.x; + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT}, + page_processing_stage::DECODE)) { + return; + } + + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + + auto const data_len = thrust::distance(s->data_start, s->data_end); + auto const num_values = data_len / s->dtype_len_in; + auto const out_thread0 = warp_size; + + PageNestingDecodeInfo* nesting_info_base = s->nesting_info; + + __shared__ level_t rep[rolling_buf_size]; // circular buffer of repetition level values + __shared__ level_t def[rolling_buf_size]; // circular buffer of definition level values + + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t skipped_leaf_values = s->page.skipped_leaf_values; + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + int target_pos; + int src_pos = s->src_pos; + + if (t < out_thread0) { + target_pos = min(src_pos + 2 * (decode_block_size - out_thread0), + s->nz_count + (decode_block_size - out_thread0)); + } else { + target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0); + } + // this needs to be here to prevent warp 1 modifying src_pos before all threads have read it + __syncthreads(); + + if (t < warp_size) { + // decode repetition and definition levels. + // - update validity vectors + // - updates offsets (for nested columns) + // - produces non-NULL value indices in s->nz_idx for subsequent decoding + gpuDecodeLevels(s, sb, target_pos, rep, def, t); + } else { + // WARP1..WARP3: Decode values + int const dtype = s->col.physical_type; + src_pos += t - out_thread0; + + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(src_pos)]; + + // for the flat hierarchy case we will be reading from the beginning of the value stream, + // regardless of the value of first_row. so adjust our destination offset accordingly. + // example: + // - user has passed skip_rows = 2, so our first_row to output is 2 + // - the row values we get from nz_idx will be + // 0, 1, 2, 3, 4 .... + // - by shifting these values by first_row, the sequence becomes + // -2, -1, 0, 1, 2 ... + // - so we will end up ignoring the first two input rows, and input rows 2..n will + // get written to the output starting at position 0. + // + if (!has_repetition) { dst_pos -= s->first_row; } + + // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values + // before first_row) in the flat hierarchy case. + if (src_pos < target_pos && dst_pos >= 0) { + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + uint32_t val_src_pos = src_pos + skipped_leaf_values; + + // nesting level that is storing actual leaf values + int leaf_level_index = s->col.max_nesting_depth - 1; + + uint32_t dtype_len = s->dtype_len; + uint8_t const* src = s->data_start + val_src_pos; + uint8_t* dst = + nesting_info_base[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; + auto const is_decimal = + s->col.logical_type.has_value() and s->col.logical_type->type == LogicalType::DECIMAL; + + // Note: non-decimal FIXED_LEN_BYTE_ARRAY will be handled in the string reader + if (is_decimal) { + switch (dtype) { + case INT32: gpuOutputByteStreamSplit(dst, src, num_values); break; + case INT64: gpuOutputByteStreamSplit(dst, src, num_values); break; + case FIXED_LEN_BYTE_ARRAY: + if (s->dtype_len_in <= sizeof(int32_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(int64_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast(dst), src, num_values, s->dtype_len_in); + break; + } else if (s->dtype_len_in <= sizeof(__int128_t)) { + gpuOutputSplitFixedLenByteArrayAsInt( + reinterpret_cast<__int128_t*>(dst), src, num_values, s->dtype_len_in); + break; + } + // unsupported decimal precision + [[fallthrough]]; + + default: s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } else if (dtype_len == 8) { + if (s->dtype_len_in == 4) { + // Reading INT32 TIME_MILLIS into 64-bit DURATION_MILLISECONDS + // TIME_MILLIS is the only duration type stored as int32: + // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#deprecated-time-convertedtype + gpuOutputByteStreamSplit(dst, src, num_values); + // zero out most significant bytes + memset(dst + 4, 0, 4); + } else if (s->ts_scale) { + gpuOutputSplitInt64Timestamp( + reinterpret_cast(dst), src, num_values, s->ts_scale); + } else { + gpuOutputByteStreamSplit(dst, src, num_values); + } + } else if (dtype_len == 4) { + gpuOutputByteStreamSplit(dst, src, num_values); + } else { + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); + } + } + + if (t == out_thread0) { s->src_pos = target_pos; } + } + __syncthreads(); + } + if (t == 0 and s->error != 0) { set_error(s->error, error_code); } +} + /** * @brief Kernel for computing the column data stored in the pages * @@ -145,7 +316,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // @@ -267,4 +438,29 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span pages, } } +/** + * @copydoc cudf::io::parquet::detail::DecodePageData + */ +void __host__ DecodeSplitPageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) +{ + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + + dim3 dim_block(decode_block_size, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodeSplitPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeSplitPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_data.cuh b/cpp/src/io/parquet/page_data.cuh index df8d801d66c..f182747650e 100644 --- a/cpp/src/io/parquet/page_data.cuh +++ b/cpp/src/io/parquet/page_data.cuh @@ -396,4 +396,80 @@ inline __device__ void gpuOutputGeneric( } } } + +/** + * Output a BYTE_STREAM_SPLIT value of type `T`. + * + * Data is encoded as N == sizeof(T) streams of length M, forming an NxM sized matrix. + * Rows are streams, columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + */ +template +__device__ inline void gpuOutputByteStreamSplit(uint8_t* dst, uint8_t const* src, size_type stride) +{ + for (int i = 0; i < sizeof(T); i++) { + dst[i] = src[i * stride]; + } +} + +/** + * Output a 64-bit BYTE_STREAM_SPLIT encoded timestamp. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + * @param ts_scale timestamp scale + */ +inline __device__ void gpuOutputSplitInt64Timestamp(int64_t* dst, + uint8_t const* src, + size_type stride, + int32_t ts_scale) +{ + gpuOutputByteStreamSplit(reinterpret_cast(dst), src, stride); + if (ts_scale < 0) { + // round towards negative infinity + int sign = (*dst < 0); + *dst = ((*dst + sign) / -ts_scale) + sign; + } else { + *dst = *dst * ts_scale; + } +} + +/** + * Output a BYTE_STREAM_SPLIT encoded decimal as an integer type. + * + * Data is encoded as N streams of length M, forming an NxM sized matrix. Rows are streams, + * columns are individual values. + * + * @param dst pointer to output data + * @param src pointer to first byte of input data in stream 0 + * @param stride number of bytes per input stream (M) + * @param dtype_len_in length of the `FIXED_LEN_BYTE_ARRAY` used to represent the decimal + */ +template +__device__ void gpuOutputSplitFixedLenByteArrayAsInt(T* dst, + uint8_t const* src, + size_type stride, + uint32_t dtype_len_in) +{ + T unscaled = 0; + // fixed_len_byte_array decimals are big endian + for (unsigned int i = 0; i < dtype_len_in; i++) { + unscaled = (unscaled << 8) | src[i * stride]; + } + // Shift the unscaled value up and back down when it isn't all 8 bytes, + // which sign extend the value for correctly representing negative numbers. + if (dtype_len_in < sizeof(T)) { + unscaled <<= (sizeof(T) - dtype_len_in) * 8; + unscaled >>= (sizeof(T) - dtype_len_in) * 8; + } + *dst = unscaled; +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 83bf7fb0d73..0c139fced24 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1316,6 +1316,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } break; case Encoding::PLAIN: + case Encoding::BYTE_STREAM_SPLIT: s->dict_size = static_cast(end - cur); s->dict_val = 0; if (s->col.physical_type == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; } diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 7c0092c6185..da1bbaebd73 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -315,7 +315,7 @@ CUDF_KERNEL void __launch_bounds__(96) using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -440,7 +440,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) using cudf::detail::warp_size; __shared__ __align__(16) delta_byte_array_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -605,7 +605,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_buffers_s state_buffers; __shared__ __align__(8) uint8_t const* page_string_data; __shared__ size_t string_offset; diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 2db6dc4270d..227f13db60e 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -15,6 +15,7 @@ */ #include "delta_enc.cuh" +#include "io/parquet/parquet_gpu.hpp" #include "io/utilities/block_utils.cuh" #include "page_string_utils.cuh" #include "parquet_gpu.cuh" @@ -238,8 +239,10 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) Encoding __device__ determine_encoding(PageType page_type, Type physical_type, bool use_dictionary, - bool write_v2_headers) + bool write_v2_headers, + bool is_split_stream) { + if (is_split_stream) { return Encoding::BYTE_STREAM_SPLIT; } // NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and // data pages (actual encoding is identical). @@ -514,6 +517,7 @@ __device__ encode_kernel_mask data_encoding_for_col(EncColumnChunk const* chunk, case column_encoding::DELTA_BINARY_PACKED: return encode_kernel_mask::DELTA_BINARY; case column_encoding::DELTA_LENGTH_BYTE_ARRAY: return encode_kernel_mask::DELTA_LENGTH_BA; case column_encoding::DELTA_BYTE_ARRAY: return encode_kernel_mask::DELTA_BYTE_ARRAY; + case column_encoding::BYTE_STREAM_SPLIT: return encode_kernel_mask::BYTE_STREAM_SPLIT; } } @@ -1608,6 +1612,19 @@ __device__ void finish_page_encode(state_buf* s, } } +// Encode a fixed-width data type int `dst`. `dst` points to the first byte +// of the result. `stride` is 1 for PLAIN encoding and num_values for +// BYTE_STREAM_SPLIT. +template +__device__ inline void encode_value(uint8_t* dst, T src, size_type stride) +{ + T v = src; + for (int i = 0; i < sizeof(T); i++) { + dst[i * stride] = v; + v >>= 8; + } +} + // PLAIN page data encoder // blockDim(128, 1, 1) template @@ -1616,7 +1633,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) device_span> comp_in, device_span> comp_out, device_span comp_results, - bool write_v2_headers) + bool write_v2_headers, + bool is_split_stream) { __shared__ __align__(8) page_enc_state_s<0> state_g; using block_scan = cub::BlockScan; @@ -1636,7 +1654,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } __syncthreads(); - if (BitAnd(s->page.kernel_mask, encode_kernel_mask::PLAIN) == 0) { return; } + auto const allowed_mask = + is_split_stream ? encode_kernel_mask::BYTE_STREAM_SPLIT : encode_kernel_mask::PLAIN; + if (BitAnd(s->page.kernel_mask, allowed_mask) == 0) { return; } // Encode data values __syncthreads(); @@ -1650,18 +1670,20 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) }(); if (t == 0) { - uint8_t* dst = s->cur; - s->rle_run = 0; - s->rle_pos = 0; - s->rle_numvals = 0; - s->rle_out = dst; - s->page.encoding = - determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers, is_split_stream); s->page_start_val = row_to_value_idx(s->page.start_row, s->col); s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col); } __syncthreads(); + auto const stride = is_split_stream ? s->page.num_valid : 1; + for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, block_size); uint32_t len, pos; @@ -1708,6 +1730,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) uint32_t total_len = 0; block_scan(scan_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); + + // if BYTE_STREAM_SPLIT, then translate byte positions to indexes + if (is_split_stream) { + pos /= dtype_len_out; + total_len /= dtype_len_out; + } + if (t == 0) { s->cur = dst + total_len; } if (is_valid) { switch (physical_type) { @@ -1725,13 +1754,11 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) } }(); - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; + encode_value(dst + pos, v, stride); } break; + case DOUBLE: case INT64: { - int64_t v = s->col.leaf_column->element(val_idx); + auto v = s->col.leaf_column->element(val_idx); int32_t ts_scale = s->col.ts_scale; if (ts_scale != 0) { if (ts_scale < 0) { @@ -1740,16 +1767,10 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) v *= ts_scale; } } - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; + encode_value(dst + pos, v, stride); } break; case INT96: { + // only PLAIN encoding is supported int64_t v = s->col.leaf_column->element(val_idx); int32_t ts_scale = s->col.ts_scale; if (ts_scale != 0) { @@ -1776,27 +1797,14 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) }(); // the 12 bytes of fixed length data. - v = last_day_nanos.count(); - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; - dst[pos + 4] = v >> 32; - dst[pos + 5] = v >> 40; - dst[pos + 6] = v >> 48; - dst[pos + 7] = v >> 56; - uint32_t w = julian_days.count(); - dst[pos + 8] = w; - dst[pos + 9] = w >> 8; - dst[pos + 10] = w >> 16; - dst[pos + 11] = w >> 24; + v = last_day_nanos.count(); + encode_value(dst + pos, v, 1); + uint32_t w = julian_days.count(); + encode_value(dst + pos + 8, w, 1); } break; - case DOUBLE: { - auto v = s->col.leaf_column->element(val_idx); - memcpy(dst + pos, &v, 8); - } break; case BYTE_ARRAY: { + // only PLAIN encoding is supported auto const bytes = [](cudf::type_id const type_id, column_device_view const* leaf_column, uint32_t const val_idx) -> void const* { @@ -1810,11 +1818,8 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) default: CUDF_UNREACHABLE("invalid type id for byte array writing!"); } }(type_id, s->col.leaf_column, val_idx); - uint32_t v = len - 4; // string length - dst[pos + 0] = v; - dst[pos + 1] = v >> 8; - dst[pos + 2] = v >> 16; - dst[pos + 3] = v >> 24; + uint32_t v = len - 4; // string length + encode_value(dst + pos, v, 1); if (v != 0) memcpy(dst + pos + 4, bytes, v); } break; case FIXED_LEN_BYTE_ARRAY: { @@ -1822,10 +1827,16 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // When using FIXED_LEN_BYTE_ARRAY for decimals, the rep is encoded in big-endian auto const v = s->col.leaf_column->element(val_idx).value(); auto const v_char_ptr = reinterpret_cast(&v); - thrust::copy(thrust::seq, - thrust::make_reverse_iterator(v_char_ptr + sizeof(v)), - thrust::make_reverse_iterator(v_char_ptr), - dst + pos); + if (is_split_stream) { + for (int i = dtype_len_out - 1; i >= 0; i--, pos += stride) { + dst[pos] = v_char_ptr[i]; + } + } else { + thrust::copy(thrust::seq, + thrust::make_reverse_iterator(v_char_ptr + sizeof(v)), + thrust::make_reverse_iterator(v_char_ptr), + dst + pos); + } } } break; } @@ -1833,6 +1844,9 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); } + // for BYTE_STREAM_SPLIT, s->cur now points to the end of the first stream. + // need it to point to the end of the Nth stream. + if (is_split_stream and t == 0) { s->cur += (dtype_len_out - 1) * s->page.num_valid; } finish_page_encode( s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } @@ -1883,13 +1897,13 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) ? s->ck.dict_rle_bits : -1; if (t == 0) { - uint8_t* dst = s->cur; - s->rle_run = 0; - s->rle_pos = 0; - s->rle_numvals = 0; - s->rle_out = dst; - s->page.encoding = - determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers); + uint8_t* dst = s->cur; + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = dst; + s->page.encoding = determine_encoding( + s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers, false); if (dict_bits >= 0 && physical_type != BOOLEAN) { dst[0] = dict_bits; s->rle_out = dst + 1; @@ -3417,7 +3431,14 @@ void EncodePages(device_span pages, gpuEncodePageLevels<<>>( pages, write_v2_headers, encode_kernel_mask::PLAIN); gpuEncodePages<<>>( - pages, comp_in, comp_out, comp_results, write_v2_headers); + pages, comp_in, comp_out, comp_results, write_v2_headers, false); + } + if (BitAnd(kernel_mask, encode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { + auto const strm = streams[s_idx++]; + gpuEncodePageLevels<<>>( + pages, write_v2_headers, encode_kernel_mask::BYTE_STREAM_SPLIT); + gpuEncodePages<<>>( + pages, comp_in, comp_out, comp_results, write_v2_headers, true); } if (BitAnd(kernel_mask, encode_kernel_mask::DELTA_BINARY) != 0) { auto const strm = streams[s_idx++]; diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 07e03460ecb..6c6afde29e4 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -166,13 +166,7 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, ColumnChunkDesc const& chunk) { if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } - if (!is_string_col(chunk) && !is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { - if (page.encoding == Encoding::PLAIN) { - return decode_kernel_mask::FIXED_WIDTH_NO_DICT; - } else if (page.encoding == Encoding::PLAIN_DICTIONARY) { - return decode_kernel_mask::FIXED_WIDTH_DICT; - } - } + if (page.encoding == Encoding::DELTA_BINARY_PACKED) { return decode_kernel_mask::DELTA_BINARY; } else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) { @@ -180,10 +174,26 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, } else if (page.encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) { return decode_kernel_mask::DELTA_LENGTH_BA; } else if (is_string_col(chunk)) { + // check for string before byte_stream_split so FLBA will go to the right kernel return decode_kernel_mask::STRING; } - // non-string, non-delta + if (!is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { + if (page.encoding == Encoding::PLAIN) { + return decode_kernel_mask::FIXED_WIDTH_NO_DICT; + } else if (page.encoding == Encoding::PLAIN_DICTIONARY || + page.encoding == Encoding::RLE_DICTIONARY) { + return decode_kernel_mask::FIXED_WIDTH_DICT; + } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT; + } + } + + if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { + return decode_kernel_mask::BYTE_STREAM_SPLIT; + } + + // non-string, non-delta, non-split_stream return decode_kernel_mask::GENERAL; } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 6f96d4dd1cf..5ba813f518f 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -1039,7 +1039,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // - the row values we get from nz_idx will be // 0, 1, 2, 3, 4 .... // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... + // -2, -1, 0, 1, 2 ... // - so we will end up ignoring the first two input rows, and input rows 2..n will // get written to the output starting at position 0. // @@ -1062,7 +1062,19 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) // choose a character parallel string copy when the average string is longer than a warp auto const use_char_ll = warp_total / warp_size >= warp_size; - if (use_char_ll) { + if (s->page.encoding == Encoding::BYTE_STREAM_SPLIT) { + if (src_pos + i < target_pos && dst_pos >= 0) { + auto const stride = s->page.str_bytes / s->dtype_len_in; + auto offptr = + reinterpret_cast(nesting_info_base[leaf_level_index].data_out) + dst_pos; + *offptr = len; + auto str_ptr = nesting_info_base[leaf_level_index].string_out + offset; + for (int ii = 0; ii < s->dtype_len_in; ii++) { + str_ptr[ii] = s->data_start[src_pos + i + ii * stride]; + } + } + __syncwarp(); + } else if (use_char_ll) { __shared__ __align__(8) uint8_t const* pointers[warp_size]; __shared__ __align__(4) size_type offsets[warp_size]; __shared__ __align__(4) int dsts[warp_size]; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index b165c60b2cf..c06fb63acda 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -54,7 +54,13 @@ constexpr int LEVEL_DECODE_BUF_SIZE = 2048; template constexpr int rolling_index(int index) { - return index % rolling_size; + // Cannot divide by 0. But `rolling_size` will be 0 for unused arrays, so this case will never + // actual be executed. + if constexpr (rolling_size == 0) { + return index; + } else { + return index % rolling_size; + } } // PARQUET-2261 allows for not writing the level histograms in certain cases. @@ -81,7 +87,8 @@ constexpr bool is_supported_encoding(Encoding enc) case Encoding::RLE_DICTIONARY: case Encoding::DELTA_BINARY_PACKED: case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: return true; + case Encoding::DELTA_BYTE_ARRAY: + case Encoding::BYTE_STREAM_SPLIT: return true; default: return false; } } @@ -199,14 +206,16 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data - FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages - FIXED_WIDTH_DICT = (1 << 6) // Run decode kernel for fixed width dictionary pages + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages + BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + BYTE_STREAM_SPLIT_FLAT = (1 << 8), // Same as above but with a flat schema }; // mask representing all the ways in which a string can be encoded @@ -517,11 +526,12 @@ constexpr uint32_t encoding_to_mask(Encoding encoding) * Used to control which encode kernels to run. */ enum class encode_kernel_mask { - PLAIN = (1 << 0), // Run plain encoding kernel - DICTIONARY = (1 << 1), // Run dictionary encoding kernel - DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel - DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel - DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel + PLAIN = (1 << 0), // Run plain encoding kernel + DICTIONARY = (1 << 1), // Run dictionary encoding kernel + DELTA_BINARY = (1 << 2), // Run DELTA_BINARY_PACKED encoding kernel + DELTA_LENGTH_BA = (1 << 3), // Run DELTA_LENGTH_BYTE_ARRAY encoding kernel + DELTA_BYTE_ARRAY = (1 << 4), // Run DELTA_BYtE_ARRAY encoding kernel + BYTE_STREAM_SPLIT = (1 << 5), // Run plain encoding kernel, but split streams }; /** @@ -759,6 +769,28 @@ void DecodePageData(cudf::detail::hostdevice_span pages, kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading the BYTE_STREAM_SPLIT column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeSplitPageData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for reading the string column data stored in the pages * @@ -891,6 +923,28 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, kernel_error::pointer error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading dictionary fixed width column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for initializing encoder row group fragments * diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index a524e7c6dcc..b7172f5ba67 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -253,6 +253,28 @@ void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_row streams[s_idx++]); } + // launch byte stream split decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT) != 0) { + DecodeSplitPageDataFlat(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + error_code.data(), + streams[s_idx++]); + } + + // launch byte stream split decoder + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT) != 0) { + DecodeSplitPageData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + error_code.data(), + streams[s_idx++]); + } + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT) != 0) { DecodePageDataFixed(subpass.pages, pass.chunks, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 6a8c31fb96b..5509a33f9f0 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -613,8 +613,7 @@ std::vector construct_schema_tree( column_in_metadata const& col_meta) { s.requested_encoding = column_encoding::USE_DEFAULT; - if (schema[parent_idx].name != "list" and - col_meta.get_encoding() != column_encoding::USE_DEFAULT) { + if (s.name != "list" and col_meta.get_encoding() != column_encoding::USE_DEFAULT) { // do some validation switch (col_meta.get_encoding()) { case column_encoding::DELTA_BINARY_PACKED: @@ -659,6 +658,21 @@ std::vector construct_schema_tree( } break; + case column_encoding::BYTE_STREAM_SPLIT: + if (s.type == Type::BYTE_ARRAY) { + CUDF_LOG_WARN( + "BYTE_STREAM_SPLIT encoding is only supported for fixed width columns; the " + "requested encoding will be ignored"); + return; + } + if (s.type == Type::INT96) { + CUDF_LOG_WARN( + "BYTE_STREAM_SPLIT encoding is not supported for INT96 columns; the " + "requested encoding will be ignored"); + return; + } + break; + // supported parquet encodings case column_encoding::PLAIN: case column_encoding::DICTIONARY: break; diff --git a/cpp/tests/io/parquet_common.cpp b/cpp/tests/io/parquet_common.cpp index b64cd230bc6..c1211869bcc 100644 --- a/cpp/tests/io/parquet_common.cpp +++ b/cpp/tests/io/parquet_common.cpp @@ -203,6 +203,7 @@ template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); +template std::vector<__int128_t> random_values<__int128_t>(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); template std::vector random_values(size_t size); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 3a3040f0957..a16b3d63177 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -35,7 +35,7 @@ using cudf::test::iterators::no_nulls; template -void test_durations(mask_op_t mask_op) +void test_durations(mask_op_t mask_op, bool use_byte_stream_split) { std::default_random_engine generator; std::uniform_int_distribution distribution_d(0, 30); @@ -67,6 +67,13 @@ void test_durations(mask_op_t mask_op) auto expected = table_view{{durations_d, durations_s, durations_ms, durations_us, durations_ns}}; + if (use_byte_stream_split) { + cudf::io::table_input_metadata expected_metadata(expected); + for (auto& col_meta : expected_metadata.column_metadata) { + col_meta.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + } + auto filepath = temp_env->get_temp_filepath("Durations.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected); @@ -91,10 +98,10 @@ void test_durations(mask_op_t mask_op) TEST_F(ParquetWriterTest, Durations) { - test_durations([](auto i) { return true; }); - test_durations([](auto i) { return (i % 2) != 0; }); - test_durations([](auto i) { return (i % 3) != 0; }); - test_durations([](auto i) { return false; }); + test_durations([](auto i) { return true; }, false); + test_durations([](auto i) { return (i % 2) != 0; }, false); + test_durations([](auto i) { return (i % 3) != 0; }, false); + test_durations([](auto i) { return false; }, false); } TEST_F(ParquetWriterTest, MultiIndex) @@ -1593,6 +1600,7 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) using cudf::io::column_encoding; using cudf::io::parquet::detail::Encoding; constexpr int num_rows = 500; + std::mt19937 engine{31337}; auto const ones = thrust::make_constant_iterator(1); auto const col = @@ -1602,6 +1610,9 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) auto const string_col = cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls()); + // throw in a list to make sure encoding selection works there too + auto list_col = make_parquet_list_col(engine, num_rows, 5, true); + auto const table = table_view({col, col, col, @@ -1613,7 +1624,8 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) string_col, string_col, string_col, - string_col}); + string_col, + *list_col}); cudf::io::table_input_metadata table_metadata(table); @@ -1635,10 +1647,17 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) set_meta(10, "string_db", column_encoding::DELTA_BINARY_PACKED); table_metadata.column_metadata[11].set_name("string_none"); - for (auto& col_meta : table_metadata.column_metadata) { - col_meta.set_nullability(false); + for (int i = 0; i < 12; i++) { + table_metadata.column_metadata[i].set_nullability(false); } + // handle list column separately + table_metadata.column_metadata[12].set_name("int32_list").set_nullability(true); + table_metadata.column_metadata[12] + .child(1) + .set_encoding(column_encoding::DELTA_BINARY_PACKED) + .set_nullability(true); + auto const filepath = temp_env->get_temp_filepath("UserRequestedEncodings.parquet"); cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table) @@ -1683,6 +1702,12 @@ TEST_F(ParquetWriterTest, UserRequestedEncodings) expect_enc(10, Encoding::PLAIN_DICTIONARY); // no request, should use dictionary expect_enc(11, Encoding::PLAIN_DICTIONARY); + // int list requested delta_binary_packed. it's has level data, so have to search for a match. + auto const encodings = fmd.row_groups[0].columns[12].meta_data.encodings; + auto const has_delta = std::any_of(encodings.begin(), encodings.end(), [](Encoding enc) { + return enc == Encoding::DELTA_BINARY_PACKED; + }); + EXPECT_TRUE(has_delta); } TEST_F(ParquetWriterTest, Decimal128DeltaByteArray) @@ -1743,6 +1768,95 @@ TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetWriterTest, ByteStreamSplit) +{ + constexpr auto num_rows = 100; + std::mt19937 engine{31337}; + auto col0_data = random_values(num_rows); + auto col1_data = random_values(num_rows); + auto col2_data = random_values(num_rows); + auto col3_data = random_values(num_rows); + + column_wrapper col0{col0_data.begin(), col0_data.end(), no_nulls()}; + column_wrapper col1{col1_data.begin(), col1_data.end(), no_nulls()}; + column_wrapper col2{col2_data.begin(), col2_data.end(), no_nulls()}; + column_wrapper col3{col3_data.begin(), col3_data.end(), no_nulls()}; + + // throw in a list to make sure both decoders are working + auto col4 = make_parquet_list_col(engine, num_rows, 5, true); + + auto expected = table_view{{col0, col1, col2, col3, *col4}}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s"); + expected_metadata.column_metadata[1].set_name("int64s"); + expected_metadata.column_metadata[2].set_name("floats"); + expected_metadata.column_metadata[3].set_name("doubles"); + expected_metadata.column_metadata[4].set_name("int32list"); + auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; + for (int i = 0; i <= 3; i++) { + expected_metadata.column_metadata[i].set_encoding(encoding); + } + + expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + + auto const filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, DecimalByteStreamSplit) +{ + constexpr cudf::size_type num_rows = 100; + auto seq_col0 = random_values(num_rows); + auto seq_col1 = random_values(num_rows); + auto seq_col2 = random_values<__int128_t>(num_rows); + + auto col0 = cudf::test::fixed_point_column_wrapper{ + seq_col0.begin(), seq_col0.end(), no_nulls(), numeric::scale_type{-5}}; + auto col1 = cudf::test::fixed_point_column_wrapper{ + seq_col1.begin(), seq_col1.end(), no_nulls(), numeric::scale_type{-9}}; + auto col2 = cudf::test::fixed_point_column_wrapper<__int128_t>{ + seq_col1.begin(), seq_col1.end(), no_nulls(), numeric::scale_type{-11}}; + + auto expected = table_view({col0, col1, col2}); + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s").set_decimal_precision(7); + expected_metadata.column_metadata[1].set_name("int64s").set_decimal_precision(11); + expected_metadata.column_metadata[2].set_name("int128s").set_decimal_precision(22); + for (auto& col_meta : expected_metadata.column_metadata) { + col_meta.set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + } + + auto const filepath = temp_env->get_temp_filepath("DecimalByteStreamSplit.parquet"); + cudf::io::parquet_writer_options args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(args); + + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(read_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, DurationByteStreamSplit) +{ + test_durations([](auto i) { return true; }, true); + test_durations([](auto i) { return (i % 2) != 0; }, true); + test_durations([](auto i) { return (i % 3) != 0; }, true); + test_durations([](auto i) { return false; }, true); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template @@ -1926,6 +2040,35 @@ TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampOverflow) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TYPED_TEST(ParquetWriterTimestampTypeTest, TimestampsByteStreamSplit) +{ + srand(42); + auto sequence = cudf::detail::make_counting_transform_iterator( + 0, [](auto i) { return ((std::rand() / 10000) * 1000); }); + + constexpr auto num_rows = 100; + column_wrapper col( + sequence, sequence + num_rows, no_nulls()); + + auto expected = table_view{{col}}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT); + + auto filepath = temp_env->get_temp_filepath("TimestampsByteStreamSplit.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .timestamp_type(this->type()); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); +} + ////////////////////////////// // writer stress tests