From b345be8f4b1a9096c77e6c1a58fd893f315e0b50 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 5 Apr 2022 19:28:34 +0000 Subject: [PATCH 01/17] first pass at splitting up stats --- cpp/src/io/orc/writer_impl.cu | 7 ++++++- cpp/src/io/orc/writer_impl.hpp | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 779d0390751..7a468c951a8 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -19,8 +19,12 @@ * @brief cuDF-IO ORC writer class implementation */ +#include "io/statistics/statistics.cuh" +#include "thrust/iterator/counting_iterator.h" #include "writer_impl.hpp" +#include +#include #include #include @@ -54,8 +58,9 @@ #include #include +#include +#include #include - namespace cudf { namespace io { namespace detail { diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 5f981793762..e640a245eca 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -447,6 +447,8 @@ class writer::impl { // statistics data saved between calls to write before a close writes out the statistics persisted_statistics persisted_stripe_statistics; + persisted_statistics persisted_stripe_statistics; + std::vector buffer_; std::unique_ptr out_sink_; }; From dabd18edd3a1527f69090e2dd0cc05f9bd0afb8f Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 6 Apr 2022 03:15:02 +0000 Subject: [PATCH 02/17] cleanup --- cpp/src/io/orc/writer_impl.cu | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 7a468c951a8..430b4350fb1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -19,12 +19,8 @@ * @brief cuDF-IO ORC writer class implementation */ -#include "io/statistics/statistics.cuh" -#include "thrust/iterator/counting_iterator.h" #include "writer_impl.hpp" -#include -#include #include #include @@ -58,8 +54,6 @@ #include #include -#include -#include #include namespace cudf { namespace io { From 2ebf073c054026919e26a639185b91e6cc64a1fc Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Thu, 14 Apr 2022 00:01:21 +0000 Subject: [PATCH 03/17] Fixing some dangling pointer issues --- cpp/src/io/orc/writer_impl.cu | 22 ++++++++++++---------- cpp/src/io/orc/writer_impl.hpp | 4 +++- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 430b4350fb1..0a784099f6d 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1215,7 +1215,18 @@ writer::impl::intermediate_statistics writer::impl::gather_statistic_blobs( return rowgroup_blobs; }(); + hostdevice_vector stripe_data = + allocate_and_encode_blobs(stripe_merge, stripe_chunks, num_stripe_blobs, stream); + + std::vector stripe_blobs(num_stripe_blobs); + for (size_t i = 0; i < num_stripe_blobs; i++) { + auto const stat_begin = stripe_data.host_ptr(stripe_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + stripe_stat_merge[i].num_chunks; + stripe_blobs[i].assign(stat_begin, stat_end); + } + return {std::move(rowgroup_blobs), + std::move(stripe_blobs), std::move(stripe_chunks), std::move(stripe_merge), std::move(col_stats_dtypes), @@ -1285,15 +1296,6 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( hostdevice_vector blobs = allocate_and_encode_blobs(stats_merge, stat_chunks, num_blobs, stream); - auto stripe_stat_merge = stats_merge.host_ptr(); - - std::vector stripe_blobs(num_stripe_blobs); - for (size_t i = 0; i < num_stripe_blobs; i++) { - auto const stat_begin = blobs.host_ptr(stripe_stat_merge[i].start_chunk); - auto const stat_end = stat_begin + stripe_stat_merge[i].num_chunks; - stripe_blobs[i].assign(stat_begin, stat_end); - } - std::vector file_blobs(single_write_mode ? num_file_blobs : 0); if (single_write_mode) { auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs); @@ -1304,7 +1306,7 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( } } - return {std::move(stripe_blobs), std::move(file_blobs)}; + return {std::move(file_blobs)}; } void writer::impl::write_index_stream(int32_t stripe_id, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index e640a245eca..6fb05d78b96 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -294,11 +294,13 @@ class writer::impl { explicit intermediate_statistics(rmm::cuda_stream_view stream) : stripe_stat_chunks(0, stream){}; intermediate_statistics(std::vector rb, + std::vector sb, rmm::device_uvector sc, hostdevice_vector smg, std::vector sdt, std::vector sct) : rowgroup_blobs(std::move(rb)), + stripe_blobs(std::move(sb)), stripe_stat_chunks(std::move(sc)), stripe_stat_merge(std::move(smg)), stats_dtypes(std::move(sdt)), @@ -306,6 +308,7 @@ class writer::impl { // blobs for the rowgroups and stripes. Not persisted std::vector rowgroup_blobs; + std::vector stripe_blobs; rmm::device_uvector stripe_stat_chunks; hostdevice_vector stripe_stat_merge; @@ -339,7 +342,6 @@ class writer::impl { * */ struct encoded_footer_statistics { - std::vector stripe_level; std::vector file_level; }; From 20771236d91e858cfecc8cdd81bdd18a12f27f77 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 20 Apr 2022 14:53:06 +0000 Subject: [PATCH 04/17] first pass at chunked writing stastistics --- cpp/src/io/orc/writer_impl.cu | 205 +++++++++++++++++++++-------- cpp/src/io/orc/writer_impl.hpp | 7 + python/cudf/cudf/tests/test_orc.py | 86 ++++++++++++ 3 files changed, 243 insertions(+), 55 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 0a784099f6d..b77db2a8340 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1243,8 +1243,7 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( auto const num_stripe_blobs = thrust::reduce(stripe_size_iter, stripe_size_iter + per_chunk_stats.stripe_stat_merge.size()); auto const num_file_blobs = num_columns; - auto const num_blobs = single_write_mode ? static_cast(num_stripe_blobs + num_file_blobs) - : static_cast(num_stripe_blobs); + auto const num_blobs = static_cast(num_stripe_blobs + num_file_blobs); if (num_stripe_blobs == 0) { return {}; } @@ -1252,58 +1251,73 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( rmm::device_uvector stat_chunks(num_blobs, stream); hostdevice_vector stats_merge(num_blobs, stream); - size_t chunk_offset = 0; - size_t merge_offset = 0; + // we need to merge the stat arrays from the persisted data. + // this needs to be done carefully because each array can contain + // a different number of stripes and stripes from each column must be + // located next to each other. We know the total number of stripes and + // we know the size of each array. The number of stripes per column in a chunk array can + // be calculated by dividing the number of chunks by the number of columns. + // That many chunks need to be copied at a time to the proper destination. + size_t offset = 0; for (size_t i = 0; i < per_chunk_stats.stripe_stat_chunks.size(); ++i) { - auto chunk_bytes = per_chunk_stats.stripe_stat_chunks[i].size() * sizeof(statistics_chunk); - auto merge_bytes = per_chunk_stats.stripe_stat_merge[i].size() * sizeof(statistics_merge_group); - cudaMemcpyAsync(stat_chunks.data() + chunk_offset, - per_chunk_stats.stripe_stat_chunks[i].data(), - chunk_bytes, - cudaMemcpyDeviceToDevice, - stream); - cudaMemcpyAsync(stats_merge.device_ptr() + merge_offset, - per_chunk_stats.stripe_stat_merge[i].device_ptr(), - merge_bytes, - cudaMemcpyDeviceToDevice, - stream); - chunk_offset += per_chunk_stats.stripe_stat_chunks[i].size(); - merge_offset += per_chunk_stats.stripe_stat_merge[i].size(); + auto const stripes_per_col = per_chunk_stats.stripe_stat_chunks[i].size() / num_columns; + + auto const chunk_bytes = stripes_per_col * sizeof(statistics_chunk); + auto const merge_bytes = stripes_per_col * sizeof(statistics_merge_group); + for (size_t col = 0; col < num_columns; ++col) { + cudaMemcpyAsync(stat_chunks.data() + (num_stripes * col) + offset, + per_chunk_stats.stripe_stat_chunks[i].data() + col * stripes_per_col, + chunk_bytes, + cudaMemcpyDeviceToDevice, + stream); + cudaMemcpyAsync(stats_merge.device_ptr() + (num_stripes * col) + offset, + per_chunk_stats.stripe_stat_merge[i].device_ptr() + col * stripes_per_col, + merge_bytes, + cudaMemcpyDeviceToDevice, + stream); + } + offset += stripes_per_col; } - if (single_write_mode) { - std::vector file_stats_merge(num_file_blobs); - for (auto i = 0u; i < num_file_blobs; ++i) { - auto col_stats = &file_stats_merge[i]; - col_stats->col_dtype = per_chunk_stats.col_types[i]; - col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; - col_stats->start_chunk = static_cast(i * num_stripes); - col_stats->num_chunks = static_cast(num_stripes); - } + std::vector file_stats_merge(num_file_blobs); + for (auto i = 0u; i < num_file_blobs; ++i) { + auto col_stats = &file_stats_merge[i]; + col_stats->col_type = per_chunk_stats.col_types[i]; + col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; + col_stats->start_chunk = static_cast(i * num_stripes); + col_stats->num_chunks = static_cast(num_stripes); + } - auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs); - cudaMemcpyAsync(d_file_stats_merge, - file_stats_merge.data(), - num_file_blobs * sizeof(statistics_merge_group), - cudaMemcpyHostToDevice, - stream); + auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs); + cudaMemcpyAsync(d_file_stats_merge, + file_stats_merge.data(), + num_file_blobs * sizeof(statistics_merge_group), + cudaMemcpyHostToDevice, + stream); - auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs; - detail::merge_group_statistics( - file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream); - } + auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs; + detail::merge_group_statistics( + file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream); hostdevice_vector blobs = allocate_and_encode_blobs(stats_merge, stat_chunks, num_blobs, stream); - std::vector file_blobs(single_write_mode ? num_file_blobs : 0); - if (single_write_mode) { - auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs); - for (auto i = 0u; i < num_file_blobs; i++) { - auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); - auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; - file_blobs[i].assign(stat_begin, stat_end); - } + auto stripe_stat_merge = stats_merge.host_ptr(); + + std::vector stripe_blobs(num_stripe_blobs); + for (size_t i = 0; i < num_stripe_blobs; i++) { + auto const stat_begin = blobs.host_ptr(stripe_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + stripe_stat_merge[i].num_chunks; + stripe_blobs[i].assign(stat_begin, stat_end); + } + + std::vector file_blobs(num_file_blobs); + auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs); + for (auto i = 0u; i < num_file_blobs; i++) { + auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; + file_blobs[i].assign(stat_begin, stat_end); +>>>>>>> first pass at chunked writing stastistics } return {std::move(file_blobs)}; @@ -1938,6 +1952,93 @@ string_dictionaries allocate_dictionaries(orc_table_view const& orc_table, std::move(is_dict_enabled)}; } +struct string_length_functor { + __device__ inline size_type operator()(int const i) const + { + if (i >= num_chunks * 2) return 0; + + auto const min = i % 2 == 0; + auto const idx = i / 2; + auto& str_val = + min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; + auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string; + return str ? str_val.length : 0; + } + + int num_chunks; + statistics_chunk* stripe_stat_chunks; + statistics_merge_group* stripe_stat_merge; +}; + +struct string_copy_functor { + __device__ inline void operator()(int const i) const + { + auto const min = i % 2 == 0; + auto const idx = i / 2; + if (stripe_stat_merge[idx].stats_dtype == dtype_string) { + auto& str_val = + min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; + + auto dst = &string_pool[offsets[i]]; + memcpy(dst, str_val.ptr, str_val.length /*, cudaMemcpyDeviceToDevice, stream*/); + str_val.ptr = dst; + } + } + + char* string_pool; + size_type* offsets; + rmm::cuda_stream_view stream; + statistics_chunk* stripe_stat_chunks; + statistics_merge_group* stripe_stat_merge; +}; + +void writer::impl::persisted_statistics::persist(int num_table_rows, + bool single_write_mode, + intermediate_statistics& intermediate_stats, + rmm::cuda_stream_view stream) +{ + if (not single_write_mode) { + // persist the strings in the chunks into a string pool and update pointers + auto const num_chunks = static_cast(intermediate_stats.stripe_stat_chunks.size()); + // min offset and max offset + 1 for total size + rmm::device_uvector offsets((num_chunks * 2) + 1, stream); + + auto iter = cudf::detail::make_counting_transform_iterator( + 0, + string_length_functor{num_chunks, + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()}); + thrust::exclusive_scan( + rmm::exec_policy(stream), iter, iter + (num_chunks * 2) + 1, offsets.begin()); + + // pull size back to host + auto const total_string_pool_size = offsets.element(num_chunks * 2, stream); + if (total_string_pool_size > 0) { + rmm::device_uvector string_pool(total_string_pool_size, stream); + + // offsets describes where in the string pool each string goes. Going with the simple + // approach for now, but it is possible something fancier with breaking up each thread into + // copying x bytes instead of a single string is the better method since we are dealing in + // min/max strings they almost certainly will not be uniform length. + thrust::for_each(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_chunks * 2), + string_copy_functor{string_pool.data(), + offsets.data(), + stream, + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()}); + string_pools.emplace_back(std::move(string_pool)); + } + } + + stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks)); + stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge)); + stats_dtypes = std::move(intermediate_stats.stats_dtypes); + col_types = std::move(intermediate_stats.col_types); + num_rows = num_table_rows; +} + void writer::impl::write(table_view const& table) { CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); @@ -2073,15 +2174,9 @@ void writer::impl::write(table_view const& table) auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation); - if (intermediate_stats.stripe_stat_chunks.size() > 0) { - persisted_stripe_statistics.stripe_stat_chunks.emplace_back( - std::move(intermediate_stats.stripe_stat_chunks)); - persisted_stripe_statistics.stripe_stat_merge.emplace_back( - std::move(intermediate_stats.stripe_stat_merge)); - persisted_stripe_statistics.stats_dtypes = std::move(intermediate_stats.stats_dtypes); - persisted_stripe_statistics.col_types = std::move(intermediate_stats.col_types); - persisted_stripe_statistics.num_rows = orc_table.num_rows(); - } + if (intermediate_stats.stripe_stat_chunks.size() > 0) + persisted_stripe_statistics.persist( + orc_table.num_rows(), single_write_mode, intermediate_stats, stream); // Write stripes std::vector> write_tasks; @@ -2203,7 +2298,7 @@ void writer::impl::close() auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics); // File-level statistics - if (single_write_mode and not statistics.file_level.empty()) { + if (not statistics.file_level.empty()) { buffer_.resize(0); pbw_.put_uint(encode_field_number(1)); pbw_.put_uint(persisted_stripe_statistics.num_rows); diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 6fb05d78b96..bb958f0ea94 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -325,13 +325,20 @@ class writer::impl { { stripe_stat_chunks.clear(); stripe_stat_merge.clear(); + string_pools.clear(); stats_dtypes.clear(); col_types.clear(); num_rows = 0; } + void persist(int num_table_rows, + bool single_write_mode, + intermediate_statistics& intermediate_stats, + rmm::cuda_stream_view stream); + std::vector> stripe_stat_chunks; std::vector> stripe_stat_merge; + std::vector> string_pools; std::vector stats_dtypes; std::vector col_types; int num_rows = 0; diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index c3969bf6c14..e56e332a749 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -718,6 +718,92 @@ def test_orc_write_statistics(tmpdir, datadir, nrows, stats_freq): assert stats_num_vals == actual_num_vals +@pytest.mark.parametrize("stats_freq", ["STRIPE", "ROWGROUP"]) +@pytest.mark.parametrize("nrows", [2, 100, 6000000]) +def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): + supported_stat_types = supported_numpy_dtypes + ["str"] + # Can't write random bool columns until issue #6763 is fixed + if nrows == 6000000: + supported_stat_types.remove("bool") + + gdf_fname = tmpdir.join("chunked_stats.orc") + writer = ORCWriter(gdf_fname) + + # Make a dataframe + gdf = cudf.DataFrame( + { + "col_" + + str(dtype): gen_rand_series( + dtype, int(nrows / 2), has_nulls=True + ) + for dtype in supported_stat_types + } + ) + + pdf1 = gdf.to_pandas() + writer.write_table(gdf) + gdf = cudf.DataFrame( + { + "col_" + + str(dtype): gen_rand_series( + dtype, int(nrows / 2), has_nulls=True + ) + for dtype in supported_stat_types + } + ) + pdf2 = gdf.to_pandas() + writer.write_table(gdf) + writer.close() + + # pandas is unable to handle min/max of string col with nulls + expect = cudf.DataFrame(pd.concat([pdf1, pdf2]).reset_index(drop=True)) + + # Read back written ORC's statistics + orc_file = pa.orc.ORCFile(gdf_fname) + ( + file_stats, + stripes_stats, + ) = cudf.io.orc.read_orc_statistics([gdf_fname]) + + # check file stats + for col in expect: + if "minimum" in file_stats[0][col]: + stats_min = file_stats[0][col]["minimum"] + actual_min = expect[col].min() + assert normalized_equals(actual_min, stats_min) + if "maximum" in file_stats[0][col]: + stats_max = file_stats[0][col]["maximum"] + actual_max = expect[col].max() + assert normalized_equals(actual_max, stats_max) + if "number_of_values" in file_stats[0][col]: + stats_num_vals = file_stats[0][col]["number_of_values"] + actual_num_vals = expect[col].count() + assert stats_num_vals == actual_num_vals + + # compare stripe statistics with actual min/max + for stripe_idx in range(0, orc_file.nstripes): + stripe = orc_file.read_stripe(stripe_idx) + # pandas is unable to handle min/max of string col with nulls + stripe_df = cudf.DataFrame(stripe.to_pandas()) + for col in stripe_df: + if "minimum" in stripes_stats[stripe_idx][col]: + actual_min = stripe_df[col].min() + stats_min = stripes_stats[stripe_idx][col]["minimum"] + assert normalized_equals(actual_min, stats_min) + + if "maximum" in stripes_stats[stripe_idx][col]: + actual_max = stripe_df[col].max() + stats_max = stripes_stats[stripe_idx][col]["maximum"] + assert normalized_equals(actual_max, stats_max) + + if "number_of_values" in stripes_stats[stripe_idx][col]: + stats_num_vals = stripes_stats[stripe_idx][col][ + "number_of_values" + ] + actual_num_vals = stripe_df[col].count() + assert stats_num_vals == actual_num_vals + + @pytest.mark.parametrize("nrows", [1, 100, 6000000]) def test_orc_write_bool_statistics(tmpdir, datadir, nrows): # Make a dataframe From b53432b1f5ecc06bfc9f65db96549052b7b843a6 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 20 Apr 2022 15:47:54 +0000 Subject: [PATCH 05/17] updates from review comments --- cpp/src/io/orc/writer_impl.cu | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index b77db2a8340..16d4d5c9238 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1279,14 +1279,15 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( offset += stripes_per_col; } - std::vector file_stats_merge(num_file_blobs); - for (auto i = 0u; i < num_file_blobs; ++i) { - auto col_stats = &file_stats_merge[i]; - col_stats->col_type = per_chunk_stats.col_types[i]; - col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; - col_stats->start_chunk = static_cast(i * num_stripes); - col_stats->num_chunks = static_cast(num_stripes); - } + if (single_write_mode) { + std::vector file_stats_merge(num_file_blobs); + for (auto i = 0u; i < num_file_blobs; ++i) { + auto col_stats = &file_stats_merge[i]; + col_stats->col_dtype = per_chunk_stats.col_types[i]; + col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; + col_stats->start_chunk = static_cast(i * num_stripes); + col_stats->num_chunks = static_cast(num_stripes); + } auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs); cudaMemcpyAsync(d_file_stats_merge, From 4cc292e7729870cef70957eb55081e8b0d36cfa6 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Thu, 21 Apr 2022 02:16:12 +0000 Subject: [PATCH 06/17] updating test and fixing merge issue --- cpp/src/io/orc/writer_impl.cu | 17 ++++++++--------- python/cudf/cudf/testing/_utils.py | 6 +++++- python/cudf/cudf/tests/test_orc.py | 6 ++++-- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 16d4d5c9238..97fade4db8b 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1279,15 +1279,14 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( offset += stripes_per_col; } - if (single_write_mode) { - std::vector file_stats_merge(num_file_blobs); - for (auto i = 0u; i < num_file_blobs; ++i) { - auto col_stats = &file_stats_merge[i]; - col_stats->col_dtype = per_chunk_stats.col_types[i]; - col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; - col_stats->start_chunk = static_cast(i * num_stripes); - col_stats->num_chunks = static_cast(num_stripes); - } + std::vector file_stats_merge(num_file_blobs); + for (auto i = 0u; i < num_file_blobs; ++i) { + auto col_stats = &file_stats_merge[i]; + col_stats->col_dtype = per_chunk_stats.col_types[i]; + col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; + col_stats->start_chunk = static_cast(i * num_stripes); + col_stats->num_chunks = static_cast(num_stripes); + } auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs); cudaMemcpyAsync(d_file_stats_merge, diff --git a/python/cudf/cudf/testing/_utils.py b/python/cudf/cudf/testing/_utils.py index 607d9121630..a293f9f3b02 100644 --- a/python/cudf/cudf/testing/_utils.py +++ b/python/cudf/cudf/testing/_utils.py @@ -306,7 +306,11 @@ def gen_rand(dtype, size, **kwargs): np.random.randint(low=low, high=high, size=size), unit=time_unit ) elif dtype.kind in ("O", "U"): - return pd.util.testing.rands_array(10, size) + low = kwargs.get("low", 0) + high = kwargs.get("high", 10) + return pd.util.testing.rands_array( + np.random.randint(low=low, high=high, size=1)[0], size + ) raise NotImplementedError(f"dtype.kind={dtype.kind}") diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index e56e332a749..186a01a9287 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -729,12 +729,14 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): gdf_fname = tmpdir.join("chunked_stats.orc") writer = ORCWriter(gdf_fname) + max_char_length = 1000 if nrows < 10000 else 100 + # Make a dataframe gdf = cudf.DataFrame( { "col_" + str(dtype): gen_rand_series( - dtype, int(nrows / 2), has_nulls=True + dtype, int(nrows / 2), has_nulls=True, low=0, high=max_char_length ) for dtype in supported_stat_types } @@ -746,7 +748,7 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): { "col_" + str(dtype): gen_rand_series( - dtype, int(nrows / 2), has_nulls=True + dtype, int(nrows / 2), has_nulls=True, low=0, high=max_char_length ) for dtype in supported_stat_types } From 08647606ca05f0375db9da4828068e2bbdd38859 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Thu, 21 Apr 2022 16:30:26 +0000 Subject: [PATCH 07/17] linting --- python/cudf/cudf/tests/test_orc.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 186a01a9287..b88fce058b5 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -736,7 +736,11 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): { "col_" + str(dtype): gen_rand_series( - dtype, int(nrows / 2), has_nulls=True, low=0, high=max_char_length + dtype, + int(nrows / 2), + has_nulls=True, + low=0, + high=max_char_length, ) for dtype in supported_stat_types } @@ -748,7 +752,11 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): { "col_" + str(dtype): gen_rand_series( - dtype, int(nrows / 2), has_nulls=True, low=0, high=max_char_length + dtype, + int(nrows / 2), + has_nulls=True, + low=0, + high=max_char_length, ) for dtype in supported_stat_types } From 68230fe2abe4c7af6356d95e88abdb42fee76b85 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Fri, 22 Apr 2022 01:07:21 +0000 Subject: [PATCH 08/17] fixing other tests that were unhappy with variable-length arrays --- python/cudf/cudf/testing/_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/testing/_utils.py b/python/cudf/cudf/testing/_utils.py index a293f9f3b02..f4fc685ef4d 100644 --- a/python/cudf/cudf/testing/_utils.py +++ b/python/cudf/cudf/testing/_utils.py @@ -306,8 +306,8 @@ def gen_rand(dtype, size, **kwargs): np.random.randint(low=low, high=high, size=size), unit=time_unit ) elif dtype.kind in ("O", "U"): - low = kwargs.get("low", 0) - high = kwargs.get("high", 10) + low = kwargs.get("low", 10) + high = kwargs.get("high", 11) return pd.util.testing.rands_array( np.random.randint(low=low, high=high, size=1)[0], size ) From cf53ed40fba2a22bf52b422277131ec949f5b5c8 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 27 Apr 2022 02:20:15 +0000 Subject: [PATCH 09/17] rebasing on branch-22.06 --- cpp/src/io/orc/writer_impl.cu | 19 ++++--------------- cpp/src/io/orc/writer_impl.hpp | 8 ++------ 2 files changed, 6 insertions(+), 21 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 97fade4db8b..8a8a9ca2986 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -55,6 +55,7 @@ #include #include + namespace cudf { namespace io { namespace detail { @@ -1215,18 +1216,7 @@ writer::impl::intermediate_statistics writer::impl::gather_statistic_blobs( return rowgroup_blobs; }(); - hostdevice_vector stripe_data = - allocate_and_encode_blobs(stripe_merge, stripe_chunks, num_stripe_blobs, stream); - - std::vector stripe_blobs(num_stripe_blobs); - for (size_t i = 0; i < num_stripe_blobs; i++) { - auto const stat_begin = stripe_data.host_ptr(stripe_stat_merge[i].start_chunk); - auto const stat_end = stat_begin + stripe_stat_merge[i].num_chunks; - stripe_blobs[i].assign(stat_begin, stat_end); - } - return {std::move(rowgroup_blobs), - std::move(stripe_blobs), std::move(stripe_chunks), std::move(stripe_merge), std::move(col_stats_dtypes), @@ -1317,10 +1307,9 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; file_blobs[i].assign(stat_begin, stat_end); ->>>>>>> first pass at chunked writing stastistics } - return {std::move(file_blobs)}; + return {std::move(stripe_blobs), std::move(file_blobs)}; } void writer::impl::write_index_stream(int32_t stripe_id, @@ -2174,9 +2163,10 @@ void writer::impl::write(table_view const& table) auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation); - if (intermediate_stats.stripe_stat_chunks.size() > 0) + if (intermediate_stats.stripe_stat_chunks.size() > 0) { persisted_stripe_statistics.persist( orc_table.num_rows(), single_write_mode, intermediate_stats, stream); + } // Write stripes std::vector> write_tasks; @@ -2235,7 +2225,6 @@ void writer::impl::write(table_view const& table) } out_sink_->host_write(buffer_.data(), buffer_.size()); } - for (auto const& task : write_tasks) { task.wait(); } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index bb958f0ea94..87f43cecf6c 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -294,21 +294,18 @@ class writer::impl { explicit intermediate_statistics(rmm::cuda_stream_view stream) : stripe_stat_chunks(0, stream){}; intermediate_statistics(std::vector rb, - std::vector sb, rmm::device_uvector sc, hostdevice_vector smg, std::vector sdt, std::vector sct) : rowgroup_blobs(std::move(rb)), - stripe_blobs(std::move(sb)), stripe_stat_chunks(std::move(sc)), stripe_stat_merge(std::move(smg)), stats_dtypes(std::move(sdt)), col_types(std::move(sct)){}; - // blobs for the rowgroups and stripes. Not persisted + // blobs for the rowgroups. Not persisted std::vector rowgroup_blobs; - std::vector stripe_blobs; rmm::device_uvector stripe_stat_chunks; hostdevice_vector stripe_stat_merge; @@ -349,6 +346,7 @@ class writer::impl { * */ struct encoded_footer_statistics { + std::vector stripe_level; std::vector file_level; }; @@ -456,8 +454,6 @@ class writer::impl { // statistics data saved between calls to write before a close writes out the statistics persisted_statistics persisted_stripe_statistics; - persisted_statistics persisted_stripe_statistics; - std::vector buffer_; std::unique_ptr out_sink_; }; From 63116e5bd301dc1d2949a13424c69f24f1b5c0a6 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 27 Apr 2022 02:44:33 +0000 Subject: [PATCH 10/17] updating to use cooperative group memcpy_async --- cpp/src/io/orc/writer_impl.cu | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 8a8a9ca2986..08c5a7f6173 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -54,6 +54,9 @@ #include #include +#include +#include + #include namespace cudf { @@ -1248,25 +1251,25 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( // we know the size of each array. The number of stripes per column in a chunk array can // be calculated by dividing the number of chunks by the number of columns. // That many chunks need to be copied at a time to the proper destination. - size_t offset = 0; + size_t num_entries_seen = 0; for (size_t i = 0; i < per_chunk_stats.stripe_stat_chunks.size(); ++i) { auto const stripes_per_col = per_chunk_stats.stripe_stat_chunks[i].size() / num_columns; auto const chunk_bytes = stripes_per_col * sizeof(statistics_chunk); auto const merge_bytes = stripes_per_col * sizeof(statistics_merge_group); for (size_t col = 0; col < num_columns; ++col) { - cudaMemcpyAsync(stat_chunks.data() + (num_stripes * col) + offset, + cudaMemcpyAsync(stat_chunks.data() + (num_stripes * col) + num_entries_seen, per_chunk_stats.stripe_stat_chunks[i].data() + col * stripes_per_col, chunk_bytes, cudaMemcpyDeviceToDevice, stream); - cudaMemcpyAsync(stats_merge.device_ptr() + (num_stripes * col) + offset, + cudaMemcpyAsync(stats_merge.device_ptr() + (num_stripes * col) + num_entries_seen, per_chunk_stats.stripe_stat_merge[i].device_ptr() + col * stripes_per_col, merge_bytes, cudaMemcpyDeviceToDevice, stream); } - offset += stripes_per_col; + num_entries_seen += stripes_per_col; } std::vector file_stats_merge(num_file_blobs); @@ -1969,7 +1972,7 @@ struct string_copy_functor { min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; auto dst = &string_pool[offsets[i]]; - memcpy(dst, str_val.ptr, str_val.length /*, cudaMemcpyDeviceToDevice, stream*/); + cooperative_groups::memcpy_async(cooperative_groups::this_thread_block(), dst, str_val.ptr, str_val.length); str_val.ptr = dst; } } From 8daf493c9e601e5d26ad673be8d10fadfbbe2457 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 27 Apr 2022 16:06:38 +0000 Subject: [PATCH 11/17] linting --- cpp/src/io/orc/writer_impl.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 08c5a7f6173..f33470025e1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1972,7 +1972,8 @@ struct string_copy_functor { min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; auto dst = &string_pool[offsets[i]]; - cooperative_groups::memcpy_async(cooperative_groups::this_thread_block(), dst, str_val.ptr, str_val.length); + cooperative_groups::memcpy_async( + cooperative_groups::this_thread_block(), dst, str_val.ptr, str_val.length); str_val.ptr = dst; } } From c068d2d1da1714ff8bc4ddc6cfd066f2d543ec07 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Wed, 27 Apr 2022 19:22:35 +0000 Subject: [PATCH 12/17] Changing to a kernel for string copy --- cpp/src/io/orc/writer_impl.cu | 48 +++++++++++++++-------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index f33470025e1..a032608c4cf 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1962,28 +1962,23 @@ struct string_length_functor { statistics_merge_group* stripe_stat_merge; }; -struct string_copy_functor { - __device__ inline void operator()(int const i) const - { - auto const min = i % 2 == 0; - auto const idx = i / 2; - if (stripe_stat_merge[idx].stats_dtype == dtype_string) { - auto& str_val = - min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; - - auto dst = &string_pool[offsets[i]]; - cooperative_groups::memcpy_async( - cooperative_groups::this_thread_block(), dst, str_val.ptr, str_val.length); - str_val.ptr = dst; +__global__ void copy_string_data(char* string_pool, + size_type* offsets, + statistics_chunk const* chunks, + statistics_merge_group const* groups) +{ + auto const idx = blockIdx.x / 2; + if (groups[idx].stats_dtype == dtype_string) { + auto const min = blockIdx.x % 2 == 0; + auto& str_val = min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val; + auto dst = &string_pool[offsets[blockIdx.x]]; + auto src = str_val.ptr; + + for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { + dst[i] = src[i]; } } - - char* string_pool; - size_type* offsets; - rmm::cuda_stream_view stream; - statistics_chunk* stripe_stat_chunks; - statistics_merge_group* stripe_stat_merge; -}; +} void writer::impl::persisted_statistics::persist(int num_table_rows, bool single_write_mode, @@ -2013,14 +2008,11 @@ void writer::impl::persisted_statistics::persist(int num_table_rows, // approach for now, but it is possible something fancier with breaking up each thread into // copying x bytes instead of a single string is the better method since we are dealing in // min/max strings they almost certainly will not be uniform length. - thrust::for_each(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(num_chunks * 2), - string_copy_functor{string_pool.data(), - offsets.data(), - stream, - intermediate_stats.stripe_stat_chunks.data(), - intermediate_stats.stripe_stat_merge.device_ptr()}); + copy_string_data<<>>( + string_pool.data(), + offsets.data(), + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()); string_pools.emplace_back(std::move(string_pool)); } } From 63233db8cadba8059e8ad0d6f91bcd30d00c1e23 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Thu, 28 Apr 2022 17:04:22 +0000 Subject: [PATCH 13/17] fixing string persisting --- cpp/src/io/orc/writer_impl.cu | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index a032608c4cf..56051b65bd4 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1964,7 +1964,7 @@ struct string_length_functor { __global__ void copy_string_data(char* string_pool, size_type* offsets, - statistics_chunk const* chunks, + statistics_chunk* chunks, statistics_merge_group const* groups) { auto const idx = blockIdx.x / 2; @@ -1977,6 +1977,7 @@ __global__ void copy_string_data(char* string_pool, for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { dst[i] = src[i]; } + if (threadIdx.x == 0) { str_val.ptr = dst; } } } From 7c67e5a98cf37535c27af122edf80fb586e6871b Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 3 May 2022 00:41:16 +0000 Subject: [PATCH 14/17] updating from review comments --- cpp/src/io/orc/writer_impl.cu | 10 +++++++--- python/cudf/cudf/testing/_utils.py | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 56051b65bd4..35e17fcf339 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1947,9 +1947,13 @@ string_dictionaries allocate_dictionaries(orc_table_view const& orc_table, struct string_length_functor { __device__ inline size_type operator()(int const i) const { + // we translate from 0 -> num_chunks * 2 because each statistic has a min and max + // string and we need to calculate lengths for both. if (i >= num_chunks * 2) return 0; + // min strings are even values, max strings are odd values of i auto const min = i % 2 == 0; + // index of the chunk auto const idx = i / 2; auto& str_val = min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; @@ -1957,9 +1961,9 @@ struct string_length_functor { return str ? str_val.length : 0; } - int num_chunks; - statistics_chunk* stripe_stat_chunks; - statistics_merge_group* stripe_stat_merge; + int const num_chunks; + statistics_chunk const* stripe_stat_chunks; + statistics_merge_group const* stripe_stat_merge; }; __global__ void copy_string_data(char* string_pool, diff --git a/python/cudf/cudf/testing/_utils.py b/python/cudf/cudf/testing/_utils.py index f4fc685ef4d..e24e941c5f7 100644 --- a/python/cudf/cudf/testing/_utils.py +++ b/python/cudf/cudf/testing/_utils.py @@ -308,7 +308,7 @@ def gen_rand(dtype, size, **kwargs): elif dtype.kind in ("O", "U"): low = kwargs.get("low", 10) high = kwargs.get("high", 11) - return pd.util.testing.rands_array( + return pd._testing.rands_array( np.random.randint(low=low, high=high, size=1)[0], size ) raise NotImplementedError(f"dtype.kind={dtype.kind}") From 585a1650d9d5fcc5ab66887f4cf1dc8e06ccc6f2 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 3 May 2022 04:30:18 +0000 Subject: [PATCH 15/17] adding orc chunked writer benchmarks and adding a test comment --- cpp/benchmarks/CMakeLists.txt | 4 + cpp/benchmarks/io/orc/orc_writer_chunks.cpp | 123 ++++++++++++++++++++ python/cudf/cudf/tests/test_orc.py | 3 + 3 files changed, 130 insertions(+) create mode 100644 cpp/benchmarks/io/orc/orc_writer_chunks.cpp diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 26bb10da69f..9553bfceaf2 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -234,6 +234,10 @@ ConfigureBench(PARQUET_WRITER_BENCH io/parquet/parquet_writer.cpp) # * orc writer benchmark -------------------------------------------------------------------------- ConfigureBench(ORC_WRITER_BENCH io/orc/orc_writer.cpp) +# ################################################################################################## +# * orc writer chunks benchmark --------------------------------------------------------------- +ConfigureBench(ORC_WRITER_CHUNKS_BENCH io/orc/orc_writer_chunks.cpp) + # ################################################################################################## # * csv writer benchmark -------------------------------------------------------------------------- ConfigureBench(CSV_WRITER_BENCH io/csv/csv_writer.cpp) diff --git a/cpp/benchmarks/io/orc/orc_writer_chunks.cpp b/cpp/benchmarks/io/orc/orc_writer_chunks.cpp new file mode 100644 index 00000000000..fa5ceb3ceff --- /dev/null +++ b/cpp/benchmarks/io/orc/orc_writer_chunks.cpp @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include + +// to enable, run cmake with -DBUILD_BENCHMARKS=ON + +constexpr int64_t data_size = 512 << 20; + +namespace cudf_io = cudf::io; + +class OrcWrite : public cudf::benchmark { +}; +class OrcWriteChunked : public cudf::benchmark { +}; + +void ORC_write(benchmark::State& state) +{ + cudf::size_type num_cols = state.range(0); + + auto tbl = create_random_table(cycle_dtypes({cudf::type_id::INT32, + cudf::type_id::FLOAT64, + cudf::type_id::STRING, + cudf::type_id::DECIMAL32, + cudf::type_id::INT8}, + num_cols), + table_size_bytes{data_size}); + cudf::table_view view = tbl->view(); + + auto mem_stats_logger = cudf::memory_stats_logger(); + cuio_source_sink_pair source_sink(io_type::VOID); + for (auto _ : state) { + cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 + cudf_io::orc_writer_options opts = + cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view); + cudf_io::write_orc(opts); + } + + state.SetBytesProcessed(static_cast(state.iterations()) * data_size); + state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); + state.counters["encoded_file_size"] = source_sink.size(); +} + +void ORC_write_chunked(benchmark::State& state) +{ + cudf::size_type num_cols = state.range(0); + cudf::size_type num_tables = state.range(1); + + std::vector> tables; + for (cudf::size_type idx = 0; idx < num_tables; idx++) { + tables.push_back(create_random_table(cycle_dtypes({cudf::type_id::INT32, + cudf::type_id::FLOAT64, + cudf::type_id::STRING, + cudf::type_id::DECIMAL32, + cudf::type_id::INT8}, + num_cols), + table_size_bytes{size_t(data_size / num_tables)})); + } + + auto mem_stats_logger = cudf::memory_stats_logger(); + cuio_source_sink_pair source_sink(io_type::VOID); + for (auto _ : state) { + cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 + cudf_io::chunked_orc_writer_options opts = + cudf_io::chunked_orc_writer_options::builder(source_sink.make_sink_info()); + cudf_io::orc_chunked_writer writer(opts); + std::for_each(tables.begin(), tables.end(), [&writer](std::unique_ptr const& tbl) { + writer.write(*tbl); + }); + writer.close(); + } + + state.SetBytesProcessed(static_cast(state.iterations()) * data_size); + state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); + state.counters["encoded_file_size"] = source_sink.size(); +} + +#define OWBM_BENCHMARK_DEFINE(name, size, num_columns) \ + BENCHMARK_DEFINE_F(OrcWrite, name)(::benchmark::State & state) { ORC_write(state); } \ + BENCHMARK_REGISTER_F(OrcWrite, name) \ + ->Args({num_columns}) \ + ->Unit(benchmark::kMillisecond) \ + ->UseManualTime() + +OWBM_BENCHMARK_DEFINE(3Gb8Cols, data_size, 8); +OWBM_BENCHMARK_DEFINE(3Gb1024Cols, data_size, 1024); + +#define OWCBM_BENCHMARK_DEFINE(name, num_columns, num_chunks) \ + BENCHMARK_DEFINE_F(OrcWriteChunked, name)(::benchmark::State & state) \ + { \ + ORC_write_chunked(state); \ + } \ + BENCHMARK_REGISTER_F(OrcWriteChunked, name) \ + ->Args({num_columns, num_chunks}) \ + ->Unit(benchmark::kMillisecond) \ + ->UseManualTime() \ + ->Iterations(4) + +OWCBM_BENCHMARK_DEFINE(3Gb8Cols64Chunks, 8, 8); +OWCBM_BENCHMARK_DEFINE(3Gb1024Cols64Chunks, 1024, 8); + +OWCBM_BENCHMARK_DEFINE(3Gb8Cols128Chunks, 8, 64); +OWCBM_BENCHMARK_DEFINE(3Gb1024Cols128Chunks, 1024, 64); diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index b88fce058b5..c5e81aa8773 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -748,6 +748,9 @@ def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): pdf1 = gdf.to_pandas() writer.write_table(gdf) + # gdf is specifically being reused here to ensure the data is destroyed + # before the next write_table call to ensure the data is persisted inside + # write and no pointers are saved into the original table gdf = cudf.DataFrame( { "col_" From 0cba0a60b394f998a495b8c7aa94775bcdcea52b Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Thu, 5 May 2022 04:52:34 +0000 Subject: [PATCH 16/17] switching to nvbench for orc benchmarks --- cpp/benchmarks/CMakeLists.txt | 2 +- cpp/benchmarks/io/orc/orc_writer_chunks.cpp | 164 +++++++++++--------- 2 files changed, 91 insertions(+), 75 deletions(-) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 9553bfceaf2..7095a7a3fd6 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -236,7 +236,7 @@ ConfigureBench(ORC_WRITER_BENCH io/orc/orc_writer.cpp) # ################################################################################################## # * orc writer chunks benchmark --------------------------------------------------------------- -ConfigureBench(ORC_WRITER_CHUNKS_BENCH io/orc/orc_writer_chunks.cpp) +ConfigureNVBench(ORC_WRITER_CHUNKS_NVBENCH io/orc/orc_writer_chunks.cpp) # ################################################################################################## # * csv writer benchmark -------------------------------------------------------------------------- diff --git a/cpp/benchmarks/io/orc/orc_writer_chunks.cpp b/cpp/benchmarks/io/orc/orc_writer_chunks.cpp index fa5ceb3ceff..dc82772fa83 100644 --- a/cpp/benchmarks/io/orc/orc_writer_chunks.cpp +++ b/cpp/benchmarks/io/orc/orc_writer_chunks.cpp @@ -19,6 +19,8 @@ #include #include +#include + #include #include #include @@ -29,95 +31,109 @@ constexpr int64_t data_size = 512 << 20; namespace cudf_io = cudf::io; -class OrcWrite : public cudf::benchmark { -}; -class OrcWriteChunked : public cudf::benchmark { -}; - -void ORC_write(benchmark::State& state) +void nvbench_orc_write(nvbench::state& state) { - cudf::size_type num_cols = state.range(0); - - auto tbl = create_random_table(cycle_dtypes({cudf::type_id::INT32, - cudf::type_id::FLOAT64, - cudf::type_id::STRING, - cudf::type_id::DECIMAL32, - cudf::type_id::INT8}, - num_cols), - table_size_bytes{data_size}); + cudf::size_type num_cols = state.get_int64("num_columns"); + + auto tbl = + create_random_table(cycle_dtypes(get_type_or_group({int32_t(type_group_id::INTEGRAL_SIGNED), + int32_t(type_group_id::FLOATING_POINT), + int32_t(type_group_id::FIXED_POINT), + int32_t(type_group_id::TIMESTAMP), + int32_t(cudf::type_id::STRING), + int32_t(cudf::type_id::STRUCT), + int32_t(cudf::type_id::LIST)}), + num_cols), + table_size_bytes{data_size}); cudf::table_view view = tbl->view(); auto mem_stats_logger = cudf::memory_stats_logger(); - cuio_source_sink_pair source_sink(io_type::VOID); - for (auto _ : state) { - cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 - cudf_io::orc_writer_options opts = - cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view); - cudf_io::write_orc(opts); - } - state.SetBytesProcessed(static_cast(state.iterations()) * data_size); - state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); - state.counters["encoded_file_size"] = source_sink.size(); + state.add_global_memory_reads(data_size); + state.add_element_count(view.num_columns() * view.num_rows()); + + size_t encoded_file_size = 0; + + state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, + [&](nvbench::launch& launch, auto& timer) { + cuio_source_sink_pair source_sink(io_type::VOID); + timer.start(); + + cudf_io::orc_writer_options opts = + cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view); + cudf_io::write_orc(opts); + + timer.stop(); + encoded_file_size = source_sink.size(); + }); + + state.add_buffer_size(mem_stats_logger.peak_memory_usage(), "pmu", "Peak Memory Usage"); + state.add_buffer_size(encoded_file_size, "efs", "Encoded File Size"); + state.add_buffer_size(view.num_rows(), "trc", "Total Rows"); } -void ORC_write_chunked(benchmark::State& state) +void nvbench_orc_chunked_write(nvbench::state& state) { - cudf::size_type num_cols = state.range(0); - cudf::size_type num_tables = state.range(1); + cudf::size_type num_cols = state.get_int64("num_columns"); + cudf::size_type num_tables = state.get_int64("num_chunks"); std::vector> tables; for (cudf::size_type idx = 0; idx < num_tables; idx++) { - tables.push_back(create_random_table(cycle_dtypes({cudf::type_id::INT32, - cudf::type_id::FLOAT64, - cudf::type_id::STRING, - cudf::type_id::DECIMAL32, - cudf::type_id::INT8}, - num_cols), - table_size_bytes{size_t(data_size / num_tables)})); + tables.push_back( + create_random_table(cycle_dtypes(get_type_or_group({int32_t(type_group_id::INTEGRAL_SIGNED), + int32_t(type_group_id::FLOATING_POINT), + int32_t(type_group_id::FIXED_POINT), + int32_t(type_group_id::TIMESTAMP), + int32_t(cudf::type_id::STRING), + int32_t(cudf::type_id::STRUCT), + int32_t(cudf::type_id::LIST)}), + num_cols), + table_size_bytes{size_t(data_size / num_tables)})); } auto mem_stats_logger = cudf::memory_stats_logger(); - cuio_source_sink_pair source_sink(io_type::VOID); - for (auto _ : state) { - cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 - cudf_io::chunked_orc_writer_options opts = - cudf_io::chunked_orc_writer_options::builder(source_sink.make_sink_info()); - cudf_io::orc_chunked_writer writer(opts); - std::for_each(tables.begin(), tables.end(), [&writer](std::unique_ptr const& tbl) { - writer.write(*tbl); + + auto size_iter = thrust::make_transform_iterator( + tables.begin(), [](auto const& i) { return i->num_columns() * i->num_rows(); }); + auto row_count_iter = + thrust::make_transform_iterator(tables.begin(), [](auto const& i) { return i->num_rows(); }); + auto total_elements = std::accumulate(size_iter, size_iter + num_tables, 0); + auto total_rows = std::accumulate(row_count_iter, row_count_iter + num_tables, 0); + + state.add_global_memory_reads(data_size); + state.add_element_count(total_elements); + + size_t encoded_file_size = 0; + + state.exec( + nvbench::exec_tag::timer | nvbench::exec_tag::sync, [&](nvbench::launch& launch, auto& timer) { + cuio_source_sink_pair source_sink(io_type::VOID); + timer.start(); + + cudf_io::chunked_orc_writer_options opts = + cudf_io::chunked_orc_writer_options::builder(source_sink.make_sink_info()); + cudf_io::orc_chunked_writer writer(opts); + std::for_each(tables.begin(), + tables.end(), + [&writer](std::unique_ptr const& tbl) { writer.write(*tbl); }); + writer.close(); + + timer.stop(); + encoded_file_size = source_sink.size(); }); - writer.close(); - } - state.SetBytesProcessed(static_cast(state.iterations()) * data_size); - state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); - state.counters["encoded_file_size"] = source_sink.size(); + state.add_buffer_size(mem_stats_logger.peak_memory_usage(), "pmu", "Peak Memory Usage"); + state.add_buffer_size(encoded_file_size, "efs", "Encoded File Size"); + state.add_buffer_size(total_rows, "trc", "Total Rows"); } -#define OWBM_BENCHMARK_DEFINE(name, size, num_columns) \ - BENCHMARK_DEFINE_F(OrcWrite, name)(::benchmark::State & state) { ORC_write(state); } \ - BENCHMARK_REGISTER_F(OrcWrite, name) \ - ->Args({num_columns}) \ - ->Unit(benchmark::kMillisecond) \ - ->UseManualTime() - -OWBM_BENCHMARK_DEFINE(3Gb8Cols, data_size, 8); -OWBM_BENCHMARK_DEFINE(3Gb1024Cols, data_size, 1024); - -#define OWCBM_BENCHMARK_DEFINE(name, num_columns, num_chunks) \ - BENCHMARK_DEFINE_F(OrcWriteChunked, name)(::benchmark::State & state) \ - { \ - ORC_write_chunked(state); \ - } \ - BENCHMARK_REGISTER_F(OrcWriteChunked, name) \ - ->Args({num_columns, num_chunks}) \ - ->Unit(benchmark::kMillisecond) \ - ->UseManualTime() \ - ->Iterations(4) - -OWCBM_BENCHMARK_DEFINE(3Gb8Cols64Chunks, 8, 8); -OWCBM_BENCHMARK_DEFINE(3Gb1024Cols64Chunks, 1024, 8); - -OWCBM_BENCHMARK_DEFINE(3Gb8Cols128Chunks, 8, 64); -OWCBM_BENCHMARK_DEFINE(3Gb1024Cols128Chunks, 1024, 64); +NVBENCH_BENCH(nvbench_orc_write) + .set_name("orc_write") + .set_min_samples(4) + .add_int64_axis("num_columns", {8, 64}); + +NVBENCH_BENCH(nvbench_orc_chunked_write) + .set_name("orc_chunked_write") + .set_min_samples(4) + .add_int64_axis("num_columns", {8, 64}) + .add_int64_axis("num_chunks", {8, 64}); From 253f0a14ec0150ce93f63e83c68e93b55fab7932 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Fri, 6 May 2022 02:20:25 +0000 Subject: [PATCH 17/17] updating from review comments --- cpp/src/io/orc/writer_impl.cu | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index c60f28bbef1..0ad33821dd7 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1952,11 +1952,11 @@ struct string_length_functor { if (i >= num_chunks * 2) return 0; // min strings are even values, max strings are odd values of i - auto const min = i % 2 == 0; + auto const should_copy_min = i % 2 == 0; // index of the chunk auto const idx = i / 2; - auto& str_val = - min ? stripe_stat_chunks[idx].min_value.str_val : stripe_stat_chunks[idx].max_value.str_val; + auto& str_val = should_copy_min ? stripe_stat_chunks[idx].min_value.str_val + : stripe_stat_chunks[idx].max_value.str_val; auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string; return str ? str_val.length : 0; } @@ -1973,10 +1973,11 @@ __global__ void copy_string_data(char* string_pool, { auto const idx = blockIdx.x / 2; if (groups[idx].stats_dtype == dtype_string) { - auto const min = blockIdx.x % 2 == 0; - auto& str_val = min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val; - auto dst = &string_pool[offsets[blockIdx.x]]; - auto src = str_val.ptr; + // min strings are even values, max strings are odd values of i + auto const should_copy_min = blockIdx.x % 2 == 0; + auto& str_val = should_copy_min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val; + auto dst = &string_pool[offsets[blockIdx.x]]; + auto src = str_val.ptr; for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { dst[i] = src[i]; @@ -2001,8 +2002,7 @@ void writer::impl::persisted_statistics::persist(int num_table_rows, string_length_functor{num_chunks, intermediate_stats.stripe_stat_chunks.data(), intermediate_stats.stripe_stat_merge.device_ptr()}); - thrust::exclusive_scan( - rmm::exec_policy(stream), iter, iter + (num_chunks * 2) + 1, offsets.begin()); + thrust::exclusive_scan(rmm::exec_policy(stream), iter, iter + offsets.size(), offsets.begin()); // pull size back to host auto const total_string_pool_size = offsets.element(num_chunks * 2, stream);