diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index e93b2bf4f25..04dcf51dd40 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -242,6 +242,10 @@ ConfigureBench(PARQUET_WRITER_BENCH io/parquet/parquet_writer.cpp) # * orc writer benchmark -------------------------------------------------------------------------- ConfigureBench(ORC_WRITER_BENCH io/orc/orc_writer.cpp) +# ################################################################################################## +# * orc writer chunks benchmark --------------------------------------------------------------- +ConfigureNVBench(ORC_WRITER_CHUNKS_NVBENCH io/orc/orc_writer_chunks.cpp) + # ################################################################################################## # * csv writer benchmark -------------------------------------------------------------------------- ConfigureBench(CSV_WRITER_BENCH io/csv/csv_writer.cpp) diff --git a/cpp/benchmarks/io/orc/orc_writer_chunks.cpp b/cpp/benchmarks/io/orc/orc_writer_chunks.cpp new file mode 100644 index 00000000000..dc82772fa83 --- /dev/null +++ b/cpp/benchmarks/io/orc/orc_writer_chunks.cpp @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +#include +#include +#include + +// to enable, run cmake with -DBUILD_BENCHMARKS=ON + +constexpr int64_t data_size = 512 << 20; + +namespace cudf_io = cudf::io; + +void nvbench_orc_write(nvbench::state& state) +{ + cudf::size_type num_cols = state.get_int64("num_columns"); + + auto tbl = + create_random_table(cycle_dtypes(get_type_or_group({int32_t(type_group_id::INTEGRAL_SIGNED), + int32_t(type_group_id::FLOATING_POINT), + int32_t(type_group_id::FIXED_POINT), + int32_t(type_group_id::TIMESTAMP), + int32_t(cudf::type_id::STRING), + int32_t(cudf::type_id::STRUCT), + int32_t(cudf::type_id::LIST)}), + num_cols), + table_size_bytes{data_size}); + cudf::table_view view = tbl->view(); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + state.add_global_memory_reads(data_size); + state.add_element_count(view.num_columns() * view.num_rows()); + + size_t encoded_file_size = 0; + + state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync, + [&](nvbench::launch& launch, auto& timer) { + cuio_source_sink_pair source_sink(io_type::VOID); + timer.start(); + + cudf_io::orc_writer_options opts = + cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view); + cudf_io::write_orc(opts); + + timer.stop(); + encoded_file_size = source_sink.size(); + }); + + state.add_buffer_size(mem_stats_logger.peak_memory_usage(), "pmu", "Peak Memory Usage"); + state.add_buffer_size(encoded_file_size, "efs", "Encoded File Size"); + state.add_buffer_size(view.num_rows(), "trc", "Total Rows"); +} + +void nvbench_orc_chunked_write(nvbench::state& state) +{ + cudf::size_type num_cols = state.get_int64("num_columns"); + cudf::size_type num_tables = state.get_int64("num_chunks"); + + std::vector> tables; + for (cudf::size_type idx = 0; idx < num_tables; idx++) { + tables.push_back( + create_random_table(cycle_dtypes(get_type_or_group({int32_t(type_group_id::INTEGRAL_SIGNED), + int32_t(type_group_id::FLOATING_POINT), + int32_t(type_group_id::FIXED_POINT), + int32_t(type_group_id::TIMESTAMP), + int32_t(cudf::type_id::STRING), + int32_t(cudf::type_id::STRUCT), + int32_t(cudf::type_id::LIST)}), + num_cols), + table_size_bytes{size_t(data_size / num_tables)})); + } + + auto mem_stats_logger = cudf::memory_stats_logger(); + + auto size_iter = thrust::make_transform_iterator( + tables.begin(), [](auto const& i) { return i->num_columns() * i->num_rows(); }); + auto row_count_iter = + thrust::make_transform_iterator(tables.begin(), [](auto const& i) { return i->num_rows(); }); + auto total_elements = std::accumulate(size_iter, size_iter + num_tables, 0); + auto total_rows = std::accumulate(row_count_iter, row_count_iter + num_tables, 0); + + state.add_global_memory_reads(data_size); + state.add_element_count(total_elements); + + size_t encoded_file_size = 0; + + state.exec( + nvbench::exec_tag::timer | nvbench::exec_tag::sync, [&](nvbench::launch& launch, auto& timer) { + cuio_source_sink_pair source_sink(io_type::VOID); + timer.start(); + + cudf_io::chunked_orc_writer_options opts = + cudf_io::chunked_orc_writer_options::builder(source_sink.make_sink_info()); + cudf_io::orc_chunked_writer writer(opts); + std::for_each(tables.begin(), + tables.end(), + [&writer](std::unique_ptr const& tbl) { writer.write(*tbl); }); + writer.close(); + + timer.stop(); + encoded_file_size = source_sink.size(); + }); + + state.add_buffer_size(mem_stats_logger.peak_memory_usage(), "pmu", "Peak Memory Usage"); + state.add_buffer_size(encoded_file_size, "efs", "Encoded File Size"); + state.add_buffer_size(total_rows, "trc", "Total Rows"); +} + +NVBENCH_BENCH(nvbench_orc_write) + .set_name("orc_write") + .set_min_samples(4) + .add_int64_axis("num_columns", {8, 64}); + +NVBENCH_BENCH(nvbench_orc_chunked_write) + .set_name("orc_chunked_write") + .set_min_samples(4) + .add_int64_axis("num_columns", {8, 64}) + .add_int64_axis("num_chunks", {8, 64}); diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index ecd2d6f6ec0..0ad33821dd7 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -54,6 +54,9 @@ #include #include +#include +#include + #include namespace cudf { @@ -1233,8 +1236,7 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( auto const num_stripe_blobs = thrust::reduce(stripe_size_iter, stripe_size_iter + per_chunk_stats.stripe_stat_merge.size()); auto const num_file_blobs = num_columns; - auto const num_blobs = single_write_mode ? static_cast(num_stripe_blobs + num_file_blobs) - : static_cast(num_stripe_blobs); + auto const num_blobs = static_cast(num_stripe_blobs + num_file_blobs); if (num_stripe_blobs == 0) { return {}; } @@ -1242,46 +1244,53 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( rmm::device_uvector stat_chunks(num_blobs, stream); hostdevice_vector stats_merge(num_blobs, stream); - size_t chunk_offset = 0; - size_t merge_offset = 0; + // we need to merge the stat arrays from the persisted data. + // this needs to be done carefully because each array can contain + // a different number of stripes and stripes from each column must be + // located next to each other. We know the total number of stripes and + // we know the size of each array. The number of stripes per column in a chunk array can + // be calculated by dividing the number of chunks by the number of columns. + // That many chunks need to be copied at a time to the proper destination. + size_t num_entries_seen = 0; for (size_t i = 0; i < per_chunk_stats.stripe_stat_chunks.size(); ++i) { - auto chunk_bytes = per_chunk_stats.stripe_stat_chunks[i].size() * sizeof(statistics_chunk); - auto merge_bytes = per_chunk_stats.stripe_stat_merge[i].size() * sizeof(statistics_merge_group); - cudaMemcpyAsync(stat_chunks.data() + chunk_offset, - per_chunk_stats.stripe_stat_chunks[i].data(), - chunk_bytes, - cudaMemcpyDeviceToDevice, - stream); - cudaMemcpyAsync(stats_merge.device_ptr() + merge_offset, - per_chunk_stats.stripe_stat_merge[i].device_ptr(), - merge_bytes, - cudaMemcpyDeviceToDevice, - stream); - chunk_offset += per_chunk_stats.stripe_stat_chunks[i].size(); - merge_offset += per_chunk_stats.stripe_stat_merge[i].size(); + auto const stripes_per_col = per_chunk_stats.stripe_stat_chunks[i].size() / num_columns; + + auto const chunk_bytes = stripes_per_col * sizeof(statistics_chunk); + auto const merge_bytes = stripes_per_col * sizeof(statistics_merge_group); + for (size_t col = 0; col < num_columns; ++col) { + cudaMemcpyAsync(stat_chunks.data() + (num_stripes * col) + num_entries_seen, + per_chunk_stats.stripe_stat_chunks[i].data() + col * stripes_per_col, + chunk_bytes, + cudaMemcpyDeviceToDevice, + stream); + cudaMemcpyAsync(stats_merge.device_ptr() + (num_stripes * col) + num_entries_seen, + per_chunk_stats.stripe_stat_merge[i].device_ptr() + col * stripes_per_col, + merge_bytes, + cudaMemcpyDeviceToDevice, + stream); + } + num_entries_seen += stripes_per_col; } - if (single_write_mode) { - std::vector file_stats_merge(num_file_blobs); - for (auto i = 0u; i < num_file_blobs; ++i) { - auto col_stats = &file_stats_merge[i]; - col_stats->col_dtype = per_chunk_stats.col_types[i]; - col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; - col_stats->start_chunk = static_cast(i * num_stripes); - col_stats->num_chunks = static_cast(num_stripes); - } + std::vector file_stats_merge(num_file_blobs); + for (auto i = 0u; i < num_file_blobs; ++i) { + auto col_stats = &file_stats_merge[i]; + col_stats->col_dtype = per_chunk_stats.col_types[i]; + col_stats->stats_dtype = per_chunk_stats.stats_dtypes[i]; + col_stats->start_chunk = static_cast(i * num_stripes); + col_stats->num_chunks = static_cast(num_stripes); + } - auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs); - cudaMemcpyAsync(d_file_stats_merge, - file_stats_merge.data(), - num_file_blobs * sizeof(statistics_merge_group), - cudaMemcpyHostToDevice, - stream); + auto d_file_stats_merge = stats_merge.device_ptr(num_stripe_blobs); + cudaMemcpyAsync(d_file_stats_merge, + file_stats_merge.data(), + num_file_blobs * sizeof(statistics_merge_group), + cudaMemcpyHostToDevice, + stream); - auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs; - detail::merge_group_statistics( - file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream); - } + auto file_stat_chunks = stat_chunks.data() + num_stripe_blobs; + detail::merge_group_statistics( + file_stat_chunks, stat_chunks.data(), d_file_stats_merge, num_file_blobs, stream); hostdevice_vector blobs = allocate_and_encode_blobs(stats_merge, stat_chunks, num_blobs, stream); @@ -1295,14 +1304,12 @@ writer::impl::encoded_footer_statistics writer::impl::finish_statistic_blobs( stripe_blobs[i].assign(stat_begin, stat_end); } - std::vector file_blobs(single_write_mode ? num_file_blobs : 0); - if (single_write_mode) { - auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs); - for (auto i = 0u; i < num_file_blobs; i++) { - auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); - auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; - file_blobs[i].assign(stat_begin, stat_end); - } + std::vector file_blobs(num_file_blobs); + auto file_stat_merge = stats_merge.host_ptr(num_stripe_blobs); + for (auto i = 0u; i < num_file_blobs; i++) { + auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; + file_blobs[i].assign(stat_begin, stat_end); } return {std::move(stripe_blobs), std::move(file_blobs)}; @@ -1937,6 +1944,91 @@ string_dictionaries allocate_dictionaries(orc_table_view const& orc_table, std::move(is_dict_enabled)}; } +struct string_length_functor { + __device__ inline size_type operator()(int const i) const + { + // we translate from 0 -> num_chunks * 2 because each statistic has a min and max + // string and we need to calculate lengths for both. + if (i >= num_chunks * 2) return 0; + + // min strings are even values, max strings are odd values of i + auto const should_copy_min = i % 2 == 0; + // index of the chunk + auto const idx = i / 2; + auto& str_val = should_copy_min ? stripe_stat_chunks[idx].min_value.str_val + : stripe_stat_chunks[idx].max_value.str_val; + auto const str = stripe_stat_merge[idx].stats_dtype == dtype_string; + return str ? str_val.length : 0; + } + + int const num_chunks; + statistics_chunk const* stripe_stat_chunks; + statistics_merge_group const* stripe_stat_merge; +}; + +__global__ void copy_string_data(char* string_pool, + size_type* offsets, + statistics_chunk* chunks, + statistics_merge_group const* groups) +{ + auto const idx = blockIdx.x / 2; + if (groups[idx].stats_dtype == dtype_string) { + // min strings are even values, max strings are odd values of i + auto const should_copy_min = blockIdx.x % 2 == 0; + auto& str_val = should_copy_min ? chunks[idx].min_value.str_val : chunks[idx].max_value.str_val; + auto dst = &string_pool[offsets[blockIdx.x]]; + auto src = str_val.ptr; + + for (int i = threadIdx.x; i < str_val.length; i += blockDim.x) { + dst[i] = src[i]; + } + if (threadIdx.x == 0) { str_val.ptr = dst; } + } +} + +void writer::impl::persisted_statistics::persist(int num_table_rows, + bool single_write_mode, + intermediate_statistics& intermediate_stats, + rmm::cuda_stream_view stream) +{ + if (not single_write_mode) { + // persist the strings in the chunks into a string pool and update pointers + auto const num_chunks = static_cast(intermediate_stats.stripe_stat_chunks.size()); + // min offset and max offset + 1 for total size + rmm::device_uvector offsets((num_chunks * 2) + 1, stream); + + auto iter = cudf::detail::make_counting_transform_iterator( + 0, + string_length_functor{num_chunks, + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()}); + thrust::exclusive_scan(rmm::exec_policy(stream), iter, iter + offsets.size(), offsets.begin()); + + // pull size back to host + auto const total_string_pool_size = offsets.element(num_chunks * 2, stream); + if (total_string_pool_size > 0) { + rmm::device_uvector string_pool(total_string_pool_size, stream); + + // offsets describes where in the string pool each string goes. Going with the simple + // approach for now, but it is possible something fancier with breaking up each thread into + // copying x bytes instead of a single string is the better method since we are dealing in + // min/max strings they almost certainly will not be uniform length. + copy_string_data<<>>( + string_pool.data(), + offsets.data(), + intermediate_stats.stripe_stat_chunks.data(), + intermediate_stats.stripe_stat_merge.device_ptr()); + string_pools.emplace_back(std::move(string_pool)); + } + } + + stripe_stat_chunks.emplace_back(std::move(intermediate_stats.stripe_stat_chunks)); + stripe_stat_merge.emplace_back(std::move(intermediate_stats.stripe_stat_merge)); + stats_dtypes = std::move(intermediate_stats.stats_dtypes); + col_types = std::move(intermediate_stats.col_types); + num_rows = num_table_rows; +} + void writer::impl::write(table_view const& table) { CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); @@ -2075,13 +2167,8 @@ void writer::impl::write(table_view const& table) auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation); if (intermediate_stats.stripe_stat_chunks.size() > 0) { - persisted_stripe_statistics.stripe_stat_chunks.emplace_back( - std::move(intermediate_stats.stripe_stat_chunks)); - persisted_stripe_statistics.stripe_stat_merge.emplace_back( - std::move(intermediate_stats.stripe_stat_merge)); - persisted_stripe_statistics.stats_dtypes = std::move(intermediate_stats.stats_dtypes); - persisted_stripe_statistics.col_types = std::move(intermediate_stats.col_types); - persisted_stripe_statistics.num_rows = orc_table.num_rows(); + persisted_stripe_statistics.persist( + orc_table.num_rows(), single_write_mode, intermediate_stats, stream); } // Write stripes @@ -2141,7 +2228,6 @@ void writer::impl::write(table_view const& table) } out_sink_->host_write(buffer_.data(), buffer_.size()); } - for (auto const& task : write_tasks) { task.wait(); } @@ -2204,7 +2290,7 @@ void writer::impl::close() auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics); // File-level statistics - if (single_write_mode and not statistics.file_level.empty()) { + if (not statistics.file_level.empty()) { buffer_.resize(0); pbw_.put_uint(encode_field_number(1)); pbw_.put_uint(persisted_stripe_statistics.num_rows); diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index d823c73007f..577c22f8ac3 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -304,7 +304,7 @@ class writer::impl { stats_dtypes(std::move(sdt)), col_types(std::move(sct)){}; - // blobs for the rowgroups and stripes. Not persisted + // blobs for the rowgroups. Not persisted std::vector rowgroup_blobs; rmm::device_uvector stripe_stat_chunks; @@ -322,13 +322,20 @@ class writer::impl { { stripe_stat_chunks.clear(); stripe_stat_merge.clear(); + string_pools.clear(); stats_dtypes.clear(); col_types.clear(); num_rows = 0; } + void persist(int num_table_rows, + bool single_write_mode, + intermediate_statistics& intermediate_stats, + rmm::cuda_stream_view stream); + std::vector> stripe_stat_chunks; std::vector> stripe_stat_merge; + std::vector> string_pools; std::vector stats_dtypes; std::vector col_types; int num_rows = 0; diff --git a/python/cudf/cudf/testing/_utils.py b/python/cudf/cudf/testing/_utils.py index e9f836d9702..679edefcc83 100644 --- a/python/cudf/cudf/testing/_utils.py +++ b/python/cudf/cudf/testing/_utils.py @@ -311,7 +311,11 @@ def gen_rand(dtype, size, **kwargs): np.random.randint(low=low, high=high, size=size), unit=time_unit ) elif dtype.kind in ("O", "U"): - return pd._testing.rands_array(10, size) + low = kwargs.get("low", 10) + high = kwargs.get("high", 11) + return pd._testing.rands_array( + np.random.randint(low=low, high=high, size=1)[0], size + ) raise NotImplementedError(f"dtype.kind={dtype.kind}") diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index e94888fc770..c547c80e48b 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -733,6 +733,105 @@ def test_orc_write_statistics(tmpdir, datadir, nrows, stats_freq): assert stats_num_vals == actual_num_vals +@pytest.mark.parametrize("stats_freq", ["STRIPE", "ROWGROUP"]) +@pytest.mark.parametrize("nrows", [2, 100, 6000000]) +def test_orc_chunked_write_statistics(tmpdir, datadir, nrows, stats_freq): + supported_stat_types = supported_numpy_dtypes + ["str"] + # Can't write random bool columns until issue #6763 is fixed + if nrows == 6000000: + supported_stat_types.remove("bool") + + gdf_fname = tmpdir.join("chunked_stats.orc") + writer = ORCWriter(gdf_fname) + + max_char_length = 1000 if nrows < 10000 else 100 + + # Make a dataframe + gdf = cudf.DataFrame( + { + "col_" + + str(dtype): gen_rand_series( + dtype, + int(nrows / 2), + has_nulls=True, + low=0, + high=max_char_length, + ) + for dtype in supported_stat_types + } + ) + + pdf1 = gdf.to_pandas() + writer.write_table(gdf) + # gdf is specifically being reused here to ensure the data is destroyed + # before the next write_table call to ensure the data is persisted inside + # write and no pointers are saved into the original table + gdf = cudf.DataFrame( + { + "col_" + + str(dtype): gen_rand_series( + dtype, + int(nrows / 2), + has_nulls=True, + low=0, + high=max_char_length, + ) + for dtype in supported_stat_types + } + ) + pdf2 = gdf.to_pandas() + writer.write_table(gdf) + writer.close() + + # pandas is unable to handle min/max of string col with nulls + expect = cudf.DataFrame(pd.concat([pdf1, pdf2]).reset_index(drop=True)) + + # Read back written ORC's statistics + orc_file = pa.orc.ORCFile(gdf_fname) + ( + file_stats, + stripes_stats, + ) = cudf.io.orc.read_orc_statistics([gdf_fname]) + + # check file stats + for col in expect: + if "minimum" in file_stats[0][col]: + stats_min = file_stats[0][col]["minimum"] + actual_min = expect[col].min() + assert normalized_equals(actual_min, stats_min) + if "maximum" in file_stats[0][col]: + stats_max = file_stats[0][col]["maximum"] + actual_max = expect[col].max() + assert normalized_equals(actual_max, stats_max) + if "number_of_values" in file_stats[0][col]: + stats_num_vals = file_stats[0][col]["number_of_values"] + actual_num_vals = expect[col].count() + assert stats_num_vals == actual_num_vals + + # compare stripe statistics with actual min/max + for stripe_idx in range(0, orc_file.nstripes): + stripe = orc_file.read_stripe(stripe_idx) + # pandas is unable to handle min/max of string col with nulls + stripe_df = cudf.DataFrame(stripe.to_pandas()) + for col in stripe_df: + if "minimum" in stripes_stats[stripe_idx][col]: + actual_min = stripe_df[col].min() + stats_min = stripes_stats[stripe_idx][col]["minimum"] + assert normalized_equals(actual_min, stats_min) + + if "maximum" in stripes_stats[stripe_idx][col]: + actual_max = stripe_df[col].max() + stats_max = stripes_stats[stripe_idx][col]["maximum"] + assert normalized_equals(actual_max, stats_max) + + if "number_of_values" in stripes_stats[stripe_idx][col]: + stats_num_vals = stripes_stats[stripe_idx][col][ + "number_of_values" + ] + actual_num_vals = stripe_df[col].count() + assert stats_num_vals == actual_num_vals + + @pytest.mark.parametrize("nrows", [1, 100, 6000000]) def test_orc_write_bool_statistics(tmpdir, datadir, nrows): # Make a dataframe