From 62c93120bfa8c36bd0b421a2dd18628e1cfcd06d Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 16 Aug 2021 21:56:24 -0700 Subject: [PATCH] Fix ORC reading of files with struct columns that have null values (#9005) Fixes #8910 Number of values in the null stream of a child column depends on the number of valid elements in the parent column. This PR changes the reading logic to account for the number of parent null values when parsing child null streams. Namely, the output row is offset by the number of null values in the parent column, in all previous stripes. To allow efficient parsing, null counts are inclusive_scan'd before the columns in the level are parsed. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - GALI PREM SAGAR (https://github.com/galipremsagar) - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu) URL: https://github.com/rapidsai/cudf/pull/9005 --- cpp/src/io/orc/orc_gpu.h | 1 + cpp/src/io/orc/reader_impl.cu | 69 +++++++++++++++++++++++++++--- cpp/src/io/orc/reader_impl.hpp | 1 + cpp/src/io/orc/stripe_data.cu | 17 ++++++-- python/cudf/cudf/tests/test_orc.py | 4 +- 5 files changed, 79 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index efc7b78cdb2..c866afd5324 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -112,6 +112,7 @@ struct ColumnDesc { int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) column_validity_info parent_validity_info; // consists of parent column valid_map and null count + uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count }; /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 033a2d9aff5..f7bd5ae86b8 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -759,6 +759,49 @@ void update_null_mask(cudf::detail::hostdevice_2dvector& chunks } } +/** + * @brief Compute the per-stripe prefix sum of null count, for each struct column in the current + * layer. + */ +void scan_null_counts(cudf::detail::hostdevice_2dvector const& chunks, + cudf::host_span> prefix_sums, + rmm::cuda_stream_view stream) +{ + auto const num_stripes = chunks.size().first; + if (num_stripes == 0) return; + + auto const num_columns = chunks.size().second; + std::vector>> prefix_sums_to_update; + for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) { + // Null counts sums are only needed for children of struct columns + if (chunks[0][col_idx].type_kind == STRUCT) { + prefix_sums_to_update.emplace_back(col_idx, prefix_sums[col_idx]); + } + } + auto const d_prefix_sums_to_update = + cudf::detail::make_device_uvector_async(prefix_sums_to_update, stream); + + thrust::for_each(rmm::exec_policy(stream), + d_prefix_sums_to_update.begin(), + d_prefix_sums_to_update.end(), + [chunks = cudf::detail::device_2dspan{chunks}] __device__( + auto const& idx_psums) { + auto const col_idx = idx_psums.first; + auto const psums = idx_psums.second; + + thrust::transform( + thrust::seq, + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + psums.size(), + psums.begin(), + [&](auto stripe_idx) { return chunks[stripe_idx][col_idx].null_count; }); + + thrust::inclusive_scan(thrust::seq, psums.begin(), psums.end(), psums.begin()); + }); + // `prefix_sums_to_update` goes out of scope, copy has to be done before we return + stream.synchronize(); +} + void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector& chunks, size_t num_dicts, size_t skip_rows, @@ -817,8 +860,6 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector> out_buffers(_selected_columns.size()); std::vector schema_info; std::vector> lvl_stripe_data(_selected_columns.size()); + std::vector>> null_count_prefix_sums; table_metadata out_metadata; // There are no columns in the table @@ -1124,6 +1168,14 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Logically view streams as columns std::vector stream_info; + null_count_prefix_sums.emplace_back(); + null_count_prefix_sums.back().reserve(_selected_columns[level].size()); + std::generate_n( + std::back_inserter(null_count_prefix_sums.back()), _selected_columns[level].size(), [&]() { + return cudf::detail::make_zeroed_device_uvector_async(total_num_stripes, + stream); + }); + // Tracker for eventually deallocating compressed and uncompressed data auto& stripe_data = lvl_stripe_data[level]; @@ -1207,10 +1259,12 @@ table_with_metadata reader::impl::read(size_type skip_rows, ? stripe_info->numberOfRows : _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx]; chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx]; - chunk.parent_validity_info.valid_map_base = - (level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base; - chunk.parent_validity_info.null_count = - (level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count; + chunk.parent_validity_info = + (level == 0) ? column_validity_info{} : _col_meta.parent_column_data[col_idx]; + chunk.parent_null_count_prefix_sums = + (level == 0) + ? nullptr + : null_count_prefix_sums[level - 1][_col_meta.parent_column_index[col_idx]].data(); chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] .ff.types[selected_columns[col_idx].id] @@ -1336,6 +1390,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Extract information to process nested child columns if (nested_col.size()) { + scan_null_counts(chunks, null_count_prefix_sums[level], stream); row_groups.device_to_host(stream, true); aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level); } diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 49c0c983992..7171b13d422 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -58,6 +58,7 @@ struct reader_column_meta { std::vector parent_column_data; // consists of parent column valid_map and null count + std::vector parent_column_index; std::vector child_start_row; // start row of child columns [stripe][column] std::vector diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 75ccd19d77b..41ee285ac25 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1167,8 +1167,17 @@ __global__ void __launch_bounds__(block_size) // No present stream: all rows are valid s->vals.u32[t] = ~0; } - while (s->top.nulls_desc_row < s->chunk.num_rows) { - uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32); + auto const prev_parent_null_count = + (s->chunk.parent_null_count_prefix_sums != nullptr && stripe > 0) + ? s->chunk.parent_null_count_prefix_sums[stripe - 1] + : 0; + auto const parent_null_count = + (s->chunk.parent_null_count_prefix_sums != nullptr) + ? s->chunk.parent_null_count_prefix_sums[stripe] - prev_parent_null_count + : 0; + auto const num_elems = s->chunk.num_rows - parent_null_count; + while (s->top.nulls_desc_row < num_elems) { + uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32); uint32_t nrows; size_t row_in; @@ -1187,7 +1196,7 @@ __global__ void __launch_bounds__(block_size) } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls_desc_row; + row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { int64_t dst_row = row_in - first_row; @@ -1251,7 +1260,7 @@ __global__ void __launch_bounds__(block_size) // Sum up the valid counts and infer null_count null_count = block_reduce(temp_storage.bk_storage).Sum(null_count); if (t == 0) { - chunks[chunk_id].null_count = null_count; + chunks[chunk_id].null_count = parent_null_count + null_count; chunks[chunk_id].skip_count = s->chunk.skip_count; } } else { diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 635332d5c24..33ce8427a71 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -844,7 +844,7 @@ def test_orc_string_stream_offset_issue(): # Data is generated using pyorc module -def generate_list_struct_buff(size=28000): +def generate_list_struct_buff(size=100_000): rd = random.Random(1) np.random.seed(seed=1) @@ -963,7 +963,7 @@ def generate_list_struct_buff(size=28000): ["lvl2_struct", "lvl1_struct"], ], ) -@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000]) +@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 100_000]) @pytest.mark.parametrize("use_index", [True, False]) def test_lists_struct_nests( columns, num_rows, use_index,