From 5b7a2dc7f3705879b2113992afd8a507f7aec9d6 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 9 Aug 2021 15:34:28 -0700 Subject: [PATCH 1/5] working solution --- cpp/src/io/orc/orc_gpu.h | 1 + cpp/src/io/orc/reader_impl.cu | 66 +++++++++++++++++++++++++++------- cpp/src/io/orc/reader_impl.hpp | 2 ++ cpp/src/io/orc/stripe_data.cu | 15 +++++--- 4 files changed, 68 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index efc7b78cdb2..88522d7b97c 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -112,6 +112,7 @@ struct ColumnDesc { int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) column_validity_info parent_validity_info; // consists of parent column valid_map and null count + uint32_t* parent_null_count_psums; }; /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 033a2d9aff5..234f4c5cb76 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -26,6 +26,7 @@ #include #include "orc.h" +#include #include #include #include @@ -759,15 +760,43 @@ void update_null_mask(cudf::detail::hostdevice_2dvector& chunks } } -void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector& chunks, - size_t num_dicts, - size_t skip_rows, - timezone_table_view tz_table, - cudf::detail::hostdevice_2dvector& row_groups, - size_t row_index_stride, - std::vector& out_buffers, - size_t level, - rmm::cuda_stream_view stream) +void calc_prefix_null_counts(cudf::detail::device_2dspan chunks, + cudf::host_span> psums, + rmm::cuda_stream_view stream) +{ + const auto num_stripes = chunks.size().first; + const auto num_columns = chunks.size().second; + + if (num_stripes == 0) return; + + for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) { + cudf::detail::device_single_thread( + [chunks, psums = psums[col_idx].data(), col_idx, num_stripes] __device__ { + if (chunks[0][col_idx].type_kind == STRUCT) { + thrust::for_each(thrust::seq, + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + num_stripes, + [=] __device__(auto stripe_idx) { + psums[stripe_idx] = chunks[stripe_idx][col_idx].null_count; + if (stripe_idx > 0) { psums[stripe_idx] += psums[stripe_idx - 1]; } + }); + } + }, + stream); + } +} + +void reader::impl::decode_stream_data( + cudf::detail::hostdevice_2dvector& chunks, + size_t num_dicts, + size_t skip_rows, + timezone_table_view tz_table, + cudf::detail::hostdevice_2dvector& row_groups, + size_t row_index_stride, + std::vector& out_buffers, + size_t level, + cudf::host_span> null_count_psums, + rmm::cuda_stream_view stream) { const auto num_stripes = chunks.size().first; const auto num_columns = chunks.size().second; @@ -790,6 +819,8 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector 0) { // Update nullmasks for children if parent was a struct and had null mask update_null_mask(chunks, out_buffers, stream, _mr); @@ -817,8 +848,6 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector> out_buffers(_selected_columns.size()); std::vector schema_info; std::vector> lvl_stripe_data(_selected_columns.size()); + std::vector>> null_count_psums; table_metadata out_metadata; // There are no columns in the table @@ -1124,6 +1156,11 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Logically view streams as columns std::vector stream_info; + null_count_psums.emplace_back(); + for (auto i = 0u; i < _selected_columns[level].size(); ++i) + null_count_psums.back().emplace_back( + cudf::detail::make_zeroed_device_uvector_async(total_num_stripes, stream)); + // Tracker for eventually deallocating compressed and uncompressed data auto& stripe_data = lvl_stripe_data[level]; @@ -1211,6 +1248,10 @@ table_with_metadata reader::impl::read(size_type skip_rows, (level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base; chunk.parent_validity_info.null_count = (level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count; + chunk.parent_null_count_psums = + (level == 0) + ? nullptr + : null_count_psums[level - 1][_col_meta.parent_column_index[col_idx]].data(); chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] .ff.types[selected_columns[col_idx].id] @@ -1332,6 +1373,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, _metadata->get_row_index_stride(), out_buffers[level], level, + null_count_psums[level], stream); // Extract information to process nested child columns diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 49c0c983992..8a2cac7465c 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -58,6 +58,7 @@ struct reader_column_meta { std::vector parent_column_data; // consists of parent column valid_map and null count + std::vector parent_column_index; std::vector child_start_row; // start row of child columns [stripe][column] std::vector @@ -149,6 +150,7 @@ class reader::impl { size_t row_index_stride, std::vector& out_buffers, size_t level, + cudf::host_span> null_count_psums, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 75ccd19d77b..5b2bb62deda 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1167,8 +1167,15 @@ __global__ void __launch_bounds__(block_size) // No present stream: all rows are valid s->vals.u32[t] = ~0; } - while (s->top.nulls_desc_row < s->chunk.num_rows) { - uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32); + auto const prev_parent_nulls = (s->chunk.parent_null_count_psums != nullptr && stripe > 0) + ? s->chunk.parent_null_count_psums[stripe - 1] + : 0; + auto const parent_nulls = (s->chunk.parent_null_count_psums != nullptr) + ? s->chunk.parent_null_count_psums[stripe] - prev_parent_nulls + : 0; + auto const num_elems = s->chunk.num_rows - parent_nulls; + while (s->top.nulls_desc_row < num_elems) { + uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32); uint32_t nrows; size_t row_in; @@ -1187,7 +1194,7 @@ __global__ void __launch_bounds__(block_size) } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls_desc_row; + row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_nulls; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { int64_t dst_row = row_in - first_row; @@ -1251,7 +1258,7 @@ __global__ void __launch_bounds__(block_size) // Sum up the valid counts and infer null_count null_count = block_reduce(temp_storage.bk_storage).Sum(null_count); if (t == 0) { - chunks[chunk_id].null_count = null_count; + chunks[chunk_id].null_count = parent_nulls + null_count; chunks[chunk_id].skip_count = s->chunk.skip_count; } } else { From d43e0d4f7b7e910543ecfaed85c3433ac7ac91d8 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 9 Aug 2021 16:39:27 -0700 Subject: [PATCH 2/5] thrustify prefix sum --- cpp/src/io/orc/reader_impl.cu | 57 +++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 234f4c5cb76..f575e8c3d4e 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -760,30 +760,47 @@ void update_null_mask(cudf::detail::hostdevice_2dvector& chunks } } -void calc_prefix_null_counts(cudf::detail::device_2dspan chunks, - cudf::host_span> psums, - rmm::cuda_stream_view stream) +/** + * @brief Compute the per-stripe prefix sum of null count, for each struct column in the current + * layer. + */ +void prefix_null_counts(cudf::detail::hostdevice_2dvector const& chunks, + cudf::host_span> prefix_sums, + rmm::cuda_stream_view stream) { - const auto num_stripes = chunks.size().first; - const auto num_columns = chunks.size().second; - + auto const num_stripes = chunks.size().first; if (num_stripes == 0) return; + auto const num_columns = chunks.size().second; + std::vector>> prefix_sums_to_update; for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) { - cudf::detail::device_single_thread( - [chunks, psums = psums[col_idx].data(), col_idx, num_stripes] __device__ { - if (chunks[0][col_idx].type_kind == STRUCT) { - thrust::for_each(thrust::seq, - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(0) + num_stripes, - [=] __device__(auto stripe_idx) { - psums[stripe_idx] = chunks[stripe_idx][col_idx].null_count; - if (stripe_idx > 0) { psums[stripe_idx] += psums[stripe_idx - 1]; } - }); - } - }, - stream); + // Null counts sums are only needed for children of struct columns + if (chunks[0][col_idx].type_kind == STRUCT) { + prefix_sums_to_update.emplace_back(col_idx, prefix_sums[col_idx]); + } } + auto const d_prefix_sums_to_update = + cudf::detail::make_device_uvector_async(prefix_sums_to_update, stream); + + thrust::for_each(rmm::exec_policy(stream), + d_prefix_sums_to_update.begin(), + d_prefix_sums_to_update.end(), + [chunks = cudf::detail::device_2dspan{chunks}] __device__( + auto const& idx_psums) { + auto const col_idx = idx_psums.first; + auto const psums = idx_psums.second; + + thrust::transform( + thrust::seq, + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(0) + psums.size(), + psums.begin(), + [&](auto stripe_idx) { return chunks[stripe_idx][col_idx].null_count; }); + + thrust::inclusive_scan(thrust::seq, psums.begin(), psums.end(), psums.begin()); + }); + // `prefix_sums_to_update` goes out of scope, copy has to be done before we return + stream.synchronize(); } void reader::impl::decode_stream_data( @@ -819,7 +836,7 @@ void reader::impl::decode_stream_data( gpu::DecodeNullsAndStringDictionaries( chunks.base_device_ptr(), global_dict.data(), num_columns, num_stripes, skip_rows, stream); - calc_prefix_null_counts(chunks, null_count_psums, stream); + prefix_null_counts(chunks, null_count_psums, stream); if (level > 0) { // Update nullmasks for children if parent was a struct and had null mask From 01515bb70a04a97564caee4535bc89d5db615fa9 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 9 Aug 2021 17:08:43 -0700 Subject: [PATCH 3/5] test; clean up --- cpp/src/io/orc/orc_gpu.h | 2 +- cpp/src/io/orc/reader_impl.cu | 9 +++------ cpp/src/io/orc/stripe_data.cu | 20 +++++++++++--------- python/cudf/cudf/tests/test_orc.py | 4 ++-- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 88522d7b97c..c866afd5324 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -112,7 +112,7 @@ struct ColumnDesc { int32_t decimal_scale; // number of fractional decimal digits for decimal type int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns) column_validity_info parent_validity_info; // consists of parent column valid_map and null count - uint32_t* parent_null_count_psums; + uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count }; /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index f575e8c3d4e..bcecb6eec12 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -26,7 +26,6 @@ #include #include "orc.h" -#include #include #include #include @@ -1261,11 +1260,9 @@ table_with_metadata reader::impl::read(size_type skip_rows, ? stripe_info->numberOfRows : _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx]; chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx]; - chunk.parent_validity_info.valid_map_base = - (level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base; - chunk.parent_validity_info.null_count = - (level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count; - chunk.parent_null_count_psums = + chunk.parent_validity_info = + (level == 0) ? column_validity_info{} : _col_meta.parent_column_data[col_idx]; + chunk.parent_null_count_prefix_sums = (level == 0) ? nullptr : null_count_psums[level - 1][_col_meta.parent_column_index[col_idx]].data(); diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 5b2bb62deda..41ee285ac25 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -1167,13 +1167,15 @@ __global__ void __launch_bounds__(block_size) // No present stream: all rows are valid s->vals.u32[t] = ~0; } - auto const prev_parent_nulls = (s->chunk.parent_null_count_psums != nullptr && stripe > 0) - ? s->chunk.parent_null_count_psums[stripe - 1] - : 0; - auto const parent_nulls = (s->chunk.parent_null_count_psums != nullptr) - ? s->chunk.parent_null_count_psums[stripe] - prev_parent_nulls - : 0; - auto const num_elems = s->chunk.num_rows - parent_nulls; + auto const prev_parent_null_count = + (s->chunk.parent_null_count_prefix_sums != nullptr && stripe > 0) + ? s->chunk.parent_null_count_prefix_sums[stripe - 1] + : 0; + auto const parent_null_count = + (s->chunk.parent_null_count_prefix_sums != nullptr) + ? s->chunk.parent_null_count_prefix_sums[stripe] - prev_parent_null_count + : 0; + auto const num_elems = s->chunk.num_rows - parent_null_count; while (s->top.nulls_desc_row < num_elems) { uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32); uint32_t nrows; @@ -1194,7 +1196,7 @@ __global__ void __launch_bounds__(block_size) } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_nulls; + row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != NULL) { int64_t dst_row = row_in - first_row; @@ -1258,7 +1260,7 @@ __global__ void __launch_bounds__(block_size) // Sum up the valid counts and infer null_count null_count = block_reduce(temp_storage.bk_storage).Sum(null_count); if (t == 0) { - chunks[chunk_id].null_count = parent_nulls + null_count; + chunks[chunk_id].null_count = parent_null_count + null_count; chunks[chunk_id].skip_count = s->chunk.skip_count; } } else { diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 635332d5c24..c5501d9fcd8 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -844,7 +844,7 @@ def test_orc_string_stream_offset_issue(): # Data is generated using pyorc module -def generate_list_struct_buff(size=28000): +def generate_list_struct_buff(size=100_000): rd = random.Random(1) np.random.seed(seed=1) @@ -963,7 +963,7 @@ def generate_list_struct_buff(size=28000): ["lvl2_struct", "lvl1_struct"], ], ) -@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000]) +@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000, 100_000]) @pytest.mark.parametrize("use_index", [True, False]) def test_lists_struct_nests( columns, num_rows, use_index, From f5cbb806933392b22829cee8d064f60a5d18b140 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 9 Aug 2021 17:40:34 -0700 Subject: [PATCH 4/5] move scan_null_counts out of decode --- cpp/src/io/orc/reader_impl.cu | 45 +++++++++++++++++----------------- cpp/src/io/orc/reader_impl.hpp | 1 - 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index bcecb6eec12..f7bd5ae86b8 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -763,9 +763,9 @@ void update_null_mask(cudf::detail::hostdevice_2dvector& chunks * @brief Compute the per-stripe prefix sum of null count, for each struct column in the current * layer. */ -void prefix_null_counts(cudf::detail::hostdevice_2dvector const& chunks, - cudf::host_span> prefix_sums, - rmm::cuda_stream_view stream) +void scan_null_counts(cudf::detail::hostdevice_2dvector const& chunks, + cudf::host_span> prefix_sums, + rmm::cuda_stream_view stream) { auto const num_stripes = chunks.size().first; if (num_stripes == 0) return; @@ -802,17 +802,15 @@ void prefix_null_counts(cudf::detail::hostdevice_2dvector const stream.synchronize(); } -void reader::impl::decode_stream_data( - cudf::detail::hostdevice_2dvector& chunks, - size_t num_dicts, - size_t skip_rows, - timezone_table_view tz_table, - cudf::detail::hostdevice_2dvector& row_groups, - size_t row_index_stride, - std::vector& out_buffers, - size_t level, - cudf::host_span> null_count_psums, - rmm::cuda_stream_view stream) +void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector& chunks, + size_t num_dicts, + size_t skip_rows, + timezone_table_view tz_table, + cudf::detail::hostdevice_2dvector& row_groups, + size_t row_index_stride, + std::vector& out_buffers, + size_t level, + rmm::cuda_stream_view stream) { const auto num_stripes = chunks.size().first; const auto num_columns = chunks.size().second; @@ -835,8 +833,6 @@ void reader::impl::decode_stream_data( gpu::DecodeNullsAndStringDictionaries( chunks.base_device_ptr(), global_dict.data(), num_columns, num_stripes, skip_rows, stream); - prefix_null_counts(chunks, null_count_psums, stream); - if (level > 0) { // Update nullmasks for children if parent was a struct and had null mask update_null_mask(chunks, out_buffers, stream, _mr); @@ -1089,7 +1085,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, std::vector> out_buffers(_selected_columns.size()); std::vector schema_info; std::vector> lvl_stripe_data(_selected_columns.size()); - std::vector>> null_count_psums; + std::vector>> null_count_prefix_sums; table_metadata out_metadata; // There are no columns in the table @@ -1172,10 +1168,13 @@ table_with_metadata reader::impl::read(size_type skip_rows, // Logically view streams as columns std::vector stream_info; - null_count_psums.emplace_back(); - for (auto i = 0u; i < _selected_columns[level].size(); ++i) - null_count_psums.back().emplace_back( - cudf::detail::make_zeroed_device_uvector_async(total_num_stripes, stream)); + null_count_prefix_sums.emplace_back(); + null_count_prefix_sums.back().reserve(_selected_columns[level].size()); + std::generate_n( + std::back_inserter(null_count_prefix_sums.back()), _selected_columns[level].size(), [&]() { + return cudf::detail::make_zeroed_device_uvector_async(total_num_stripes, + stream); + }); // Tracker for eventually deallocating compressed and uncompressed data auto& stripe_data = lvl_stripe_data[level]; @@ -1265,7 +1264,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, chunk.parent_null_count_prefix_sums = (level == 0) ? nullptr - : null_count_psums[level - 1][_col_meta.parent_column_index[col_idx]].data(); + : null_count_prefix_sums[level - 1][_col_meta.parent_column_index[col_idx]].data(); chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] .ff.types[selected_columns[col_idx].id] @@ -1387,11 +1386,11 @@ table_with_metadata reader::impl::read(size_type skip_rows, _metadata->get_row_index_stride(), out_buffers[level], level, - null_count_psums[level], stream); // Extract information to process nested child columns if (nested_col.size()) { + scan_null_counts(chunks, null_count_prefix_sums[level], stream); row_groups.device_to_host(stream, true); aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level); } diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 8a2cac7465c..7171b13d422 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -150,7 +150,6 @@ class reader::impl { size_t row_index_stride, std::vector& out_buffers, size_t level, - cudf::host_span> null_count_psums, rmm::cuda_stream_view stream); /** From 9a854be047c3da24a9fdfb372888dba4107981c9 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 16 Aug 2021 13:17:38 -0700 Subject: [PATCH 5/5] remove redundant test case --- python/cudf/cudf/tests/test_orc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index c5501d9fcd8..33ce8427a71 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -963,7 +963,7 @@ def generate_list_struct_buff(size=100_000): ["lvl2_struct", "lvl1_struct"], ], ) -@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000, 100_000]) +@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 100_000]) @pytest.mark.parametrize("use_index", [True, False]) def test_lists_struct_nests( columns, num_rows, use_index,