diff --git a/cpp/src/io/parquet/decode_preprocess.cu b/cpp/src/io/parquet/decode_preprocess.cu index d9f91ed564c..afe9a76a6d0 100644 --- a/cpp/src/io/parquet/decode_preprocess.cu +++ b/cpp/src/io/parquet/decode_preprocess.cu @@ -232,7 +232,10 @@ __global__ void __launch_bounds__(preprocess_block_size) {rep_runs}}; // setup page info - if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, all_types_filter{}, false)) { return; } + if (!setupLocalPageInfo( + s, pp, chunks, min_row, num_rows, all_types_filter{}, page_processing_stage::PREPROCESS)) { + return; + } // initialize the stream decoders (requires values computed in setupLocalPageInfo) // the size of the rolling batch buffer diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index f7c07aafb70..d39edd70fcd 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -446,7 +446,7 @@ __global__ void __launch_bounds__(decode_block_size) min_row, num_rows, mask_filter{decode_kernel_mask::GENERAL}, - true)) { + page_processing_stage::DECODE)) { return; } diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 5b41ce8fa1f..f6f2f9e9f18 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1014,6 +1014,12 @@ struct mask_filter { } }; +enum class page_processing_stage { + PREPROCESS, + STRING_BOUNDS, + DECODE, +}; + /** * @brief Sets up block-local page state information from the global pages. * @@ -1023,7 +1029,7 @@ struct mask_filter { * @param[in] min_row Crop all rows below min_row * @param[in] num_rows Maximum number of rows to read * @param[in] filter Filtering function used to decide which pages to operate on - * @param[in] is_decode_step If we are setting up for the decode step (instead of the preprocess) + * @param[in] stage What stage of the decoding process is this being called from * @tparam Filter Function that takes a PageInfo reference and returns true if the given page should * be operated on Currently only used by gpuComputePageSizes step) * @return True if this page should be processed further @@ -1035,7 +1041,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, size_t min_row, size_t num_rows, Filter filter, - bool is_decode_step) + page_processing_stage stage) { int t = threadIdx.x; @@ -1126,7 +1132,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // // NOTE: this check needs to be done after the null counts have been zeroed out bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; - if (is_decode_step && s->num_rows == 0 && + if ((stage == page_processing_stage::STRING_BOUNDS || stage == page_processing_stage::DECODE) && + s->num_rows == 0 && !(has_repetition && (is_bounds_page(s, min_row, num_rows, has_repetition) || is_page_contained(s, min_row, num_rows)))) { return false; @@ -1237,7 +1244,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // NOTE: in a chunked read situation, s->col.column_data_base and s->col.valid_map_base // will be aliased to memory that has been freed when we get here in the non-decode step, so // we cannot check against nullptr. we'll just check a flag directly. - if (is_decode_step) { + if (stage == page_processing_stage::DECODE) { int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { PageNestingDecodeInfo* nesting_info = &s->nesting_info[idx]; @@ -1387,13 +1394,13 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // if we're in the decoding step, jump directly to the first // value we care about - if (is_decode_step) { + if (stage == page_processing_stage::DECODE) { s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0; - } else { + } else if (stage == page_processing_stage::PREPROCESS) { s->input_value_count = 0; s->input_leaf_count = 0; - s->page.skipped_values = - -1; // magic number to indicate it hasn't been set for use inside UpdatePageSizes + // magic number to indicate it hasn't been set for use inside UpdatePageSizes + s->page.skipped_values = -1; s->page.skipped_leaf_values = 0; } } diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 528048d2fe6..98f8fbb09a2 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -323,9 +323,13 @@ __global__ void __launch_bounds__(96) auto* const db = &db_state; [[maybe_unused]] null_count_back_copier _{s, t}; - auto const mask = decode_kernel_mask::DELTA_BINARY; - if (!setupLocalPageInfo( - s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) { + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::DELTA_BINARY}, + page_processing_stage::DECODE)) { return; } @@ -446,9 +450,13 @@ __global__ void __launch_bounds__(decode_block_size) auto* const dba = &db_state; [[maybe_unused]] null_count_back_copier _{s, t}; - auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY; - if (!setupLocalPageInfo( - s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) { + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY}, + page_processing_stage::DECODE)) { return; } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 33781848426..ef2e7ef42ef 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -616,7 +616,15 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBou // setup page info auto const mask = BitOr(decode_kernel_mask::STRING, decode_kernel_mask::DELTA_BYTE_ARRAY); - if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; } + if (!setupLocalPageInfo(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{mask}, + page_processing_stage::STRING_BOUNDS)) { + return; + } bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); @@ -659,8 +667,15 @@ __global__ void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPageS bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; // setup page info - auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY; - if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; } + if (!setupLocalPageInfo(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY}, + page_processing_stage::STRING_BOUNDS)) { + return; + } auto const start_value = pp->start_val; @@ -722,8 +737,13 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; // setup page info - if (!setupLocalPageInfo( - s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::STRING}, true)) { + if (!setupLocalPageInfo(s, + pp, + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::STRING}, + page_processing_stage::STRING_BOUNDS)) { return; } @@ -816,9 +836,13 @@ __global__ void __launch_bounds__(decode_block_size) int const lane_id = t % warp_size; [[maybe_unused]] null_count_back_copier _{s, t}; - auto const mask = decode_kernel_mask::STRING; - if (!setupLocalPageInfo( - s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) { + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::STRING}, + page_processing_stage::DECODE)) { return; }