diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index efc7b78cdb2..c866afd5324 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -112,6 +112,7 @@ struct ColumnDesc { int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) column_validity_info parent_validity_info; // consists of parent column valid_map and null count + uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count }; /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 033a2d9aff5..f7bd5ae86b8 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -759,6 +759,49 @@ void update_null_mask(cudf::detail::hostdevice_2dvector& chunks } } +/** + * @brief Compute the per-stripe prefix sum of null count, for each struct column in the current + * layer. + */ +void scan_null_counts(cudf::detail::hostdevice_2dvector const& chunks, + cudf::host_span> prefix_sums, + rmm::cuda_stream_view stream) +{ + auto const num_stripes = chunks.size().first; + if (num_stripes == 0) return; + + auto const num_columns = chunks.size().second; + std::vector>> prefix_sums_to_update; + for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) { + // Null counts sums are only needed for children of struct columns + if (chunks[0][col_idx].type_kind == STRUCT) { + prefix_sums_to_update.emplace_back(col_idx, prefix_sums[col_idx]); + } + } + auto const d_prefix_sums_to_update = + cudf::detail::make_device_uvector_async(prefix_sums_to_update, stream); + + thrust::for_each(rmm::exec_policy(stream), + d_prefix_sums_to_update.begin(), + d_prefix_sums_to_update.end(), + [chunks = cudf::detail::device_2dspan{chunks}] __device__( + auto const& idx_psums) { + auto const col_idx = idx_psums.first; + auto const psums = idx_psums.second; + + thrust::transform( + thrust::seq, + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + psums.size(), + psums.begin(), + [&](auto stripe_idx) { return chunks[stripe_idx][col_idx].null_count; }); + + thrust::inclusive_scan(thrust::seq, psums.begin(), psums.end(), psums.begin()); + }); + // `prefix_sums_to_update` goes out of scope, copy has to be done before we return + stream.synchronize(); +} + void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector& chunks, size_t num_dicts, size_t skip_rows, @@ -817,8 +860,6 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector> out_buffers(_selected_columns.size()); std::vector schema_info; std::vector> lvl_stripe_data(_selected_columns.size()); + std::vector>> null_count_prefix_sums; table_metadata out_metadata; // There are no columns in the table @@ -1124,6 +1168,14 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Logically view streams as columns std::vector stream_info; + null_count_prefix_sums.emplace_back(); + null_count_prefix_sums.back().reserve(_selected_columns[level].size()); + std::generate_n( + std::back_inserter(null_count_prefix_sums.back()), _selected_columns[level].size(), [&]() { + return cudf::detail::make_zeroed_device_uvector_async(total_num_stripes, + stream); + }); + // Tracker for eventually deallocating compressed and uncompressed data auto& stripe_data = lvl_stripe_data[level]; @@ -1207,10 +1259,12 @@ table_with_metadata reader::impl::read(size_type skip_rows, ? stripe_info->numberOfRows : _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx]; chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx]; - chunk.parent_validity_info.valid_map_base = - (level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base; - chunk.parent_validity_info.null_count = - (level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count; + chunk.parent_validity_info = + (level == 0) ? column_validity_info{} : _col_meta.parent_column_data[col_idx]; + chunk.parent_null_count_prefix_sums = + (level == 0) + ? nullptr + : null_count_prefix_sums[level - 1][_col_meta.parent_column_index[col_idx]].data(); chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] .ff.types[selected_columns[col_idx].id] @@ -1336,6 +1390,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Extract information to process nested child columns if (nested_col.size()) { + scan_null_counts(chunks, null_count_prefix_sums[level], stream); row_groups.device_to_host(stream, true); aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level); } diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 49c0c983992..7171b13d422 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -58,6 +58,7 @@ struct reader_column_meta { std::vector parent_column_data; // consists of parent column valid_map and null count + std::vector parent_column_index; std::vector child_start_row; // start row of child columns [stripe][column] std::vector diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 75ccd19d77b..41ee285ac25 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1167,8 +1167,17 @@ __global__ void __launch_bounds__(block_size) // No present stream: all rows are valid s->vals.u32[t] = ~0; } - while (s->top.nulls_desc_row < s->chunk.num_rows) { - uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32); + auto const prev_parent_null_count = + (s->chunk.parent_null_count_prefix_sums != nullptr && stripe > 0) + ? s->chunk.parent_null_count_prefix_sums[stripe - 1] + : 0; + auto const parent_null_count = + (s->chunk.parent_null_count_prefix_sums != nullptr) + ? s->chunk.parent_null_count_prefix_sums[stripe] - prev_parent_null_count + : 0; + auto const num_elems = s->chunk.num_rows - parent_null_count; + while (s->top.nulls_desc_row < num_elems) { + uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32); uint32_t nrows; size_t row_in; @@ -1187,7 +1196,7 @@ __global__ void __launch_bounds__(block_size) } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls_desc_row; + row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { int64_t dst_row = row_in - first_row; @@ -1251,7 +1260,7 @@ __global__ void __launch_bounds__(block_size) // Sum up the valid counts and infer null_count null_count = block_reduce(temp_storage.bk_storage).Sum(null_count); if (t == 0) { - chunks[chunk_id].null_count = null_count; + chunks[chunk_id].null_count = parent_null_count + null_count; chunks[chunk_id].skip_count = s->chunk.skip_count; } } else { diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 635332d5c24..33ce8427a71 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -844,7 +844,7 @@ def test_orc_string_stream_offset_issue(): # Data is generated using pyorc module -def generate_list_struct_buff(size=28000): +def generate_list_struct_buff(size=100_000): rd = random.Random(1) np.random.seed(seed=1) @@ -963,7 +963,7 @@ def generate_list_struct_buff(size=28000): ["lvl2_struct", "lvl1_struct"], ], ) -@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000]) +@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 100_000]) @pytest.mark.parametrize("use_index", [True, False]) def test_lists_struct_nests( columns, num_rows, use_index,