diff --git a/cpp/benchmarks/io/orc/orc_writer_benchmark.cpp b/cpp/benchmarks/io/orc/orc_writer_benchmark.cpp index be1a2073057..b0eba17359f 100644 --- a/cpp/benchmarks/io/orc/orc_writer_benchmark.cpp +++ b/cpp/benchmarks/io/orc/orc_writer_benchmark.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "cudf/io/types.hpp" #include #include @@ -65,8 +66,14 @@ void BM_orc_write_varying_inout(benchmark::State& state) void BM_orc_write_varying_options(benchmark::State& state) { - auto const compression = static_cast(state.range(0)); - auto const enable_stats = state.range(1) != 0; + auto const compression = static_cast(state.range(0)); + auto const stats_freq = [&] { + switch (state.range(2)) { + case 0: return cudf::io::STATISTICS_NONE; + case 1: return cudf::io::ORC_STATISTICS_STRIPE; + default: return cudf::io::ORC_STATISTICS_ROW_GROUP; + } + }(); auto const data_types = get_type_or_group({int32_t(type_group_id::INTEGRAL_SIGNED), int32_t(type_group_id::FLOATING_POINT), @@ -85,7 +92,7 @@ void BM_orc_write_varying_options(benchmark::State& state) cudf_io::orc_writer_options const options = cudf_io::orc_writer_options::builder(source_sink.make_sink_info(), view) .compression(compression) - .enable_statistics(enable_stats); + .enable_statistics(stats_freq); cudf_io::write_orc(options); } @@ -113,6 +120,8 @@ BENCHMARK_DEFINE_F(OrcWrite, writer_options) BENCHMARK_REGISTER_F(OrcWrite, writer_options) ->ArgsProduct({{int32_t(cudf::io::compression_type::NONE), int32_t(cudf::io::compression_type::SNAPPY)}, - {0, 1}}) + {int32_t{cudf::io::STATISTICS_NONE}, + int32_t{cudf::io::ORC_STATISTICS_STRIPE}, + int32_t{cudf::io::ORC_STATISTICS_ROW_GROUP}}}) ->Unit(benchmark::kMillisecond) ->UseManualTime(); diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 51f82bc4061..108251dd646 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -434,6 +434,18 @@ table_with_metadata read_orc( */ class orc_writer_options_builder; +/** + * @brief Constants to disambiguate statistics terminology for ORC. + * + * ORC refers to its finest granularity of row-grouping as "row group", + * which corresponds to Parquet "pages". + * Similarly, ORC's "stripe" corresponds to a Parquet "row group". + * The following constants disambiguate the terminology for the statistics + * collected at each level. + */ +static constexpr statistics_freq ORC_STATISTICS_STRIPE = statistics_freq::STATISTICS_ROWGROUP; +static constexpr statistics_freq ORC_STATISTICS_ROW_GROUP = statistics_freq::STATISTICS_PAGE; + /** * @brief Settings to use for `write_orc()`. */ @@ -442,8 +454,8 @@ class orc_writer_options { sink_info _sink; // Specify the compression format to use compression_type _compression = compression_type::AUTO; - // Enable writing column statistics - bool _enable_statistics = true; + // Specify frequency of statistics collection + statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) size_t _stripe_size_bytes = default_stripe_size_bytes; // Maximum number of rows in stripe (unless smaller than a single row group) @@ -501,7 +513,15 @@ class orc_writer_options { /** * @brief Whether writing column statistics is enabled/disabled. */ - [[nodiscard]] bool is_enabled_statistics() const { return _enable_statistics; } + [[nodiscard]] bool is_enabled_statistics() const + { + return _stats_freq != statistics_freq::STATISTICS_NONE; + } + + /** + * @brief Returns frequency of statistics collection. + */ + [[nodiscard]] statistics_freq get_statistics_freq() const { return _stats_freq; } /** * @brief Returns maximum stripe size, in bytes. @@ -550,11 +570,16 @@ class orc_writer_options { void set_compression(compression_type comp) { _compression = comp; } /** - * @brief Enable/Disable writing column statistics. + * @brief Choose granularity of statistics collection. * - * @param val Boolean value to enable/disable statistics. + * The granularity can be set to: + * - cudf::io::STATISTICS_NONE: No statistics are collected. + * - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe. + * - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group. + * + * @param val Frequency of statistics collection. */ - void enable_statistics(bool val) { _enable_statistics = val; } + void enable_statistics(statistics_freq val) { _stats_freq = val; } /** * @brief Sets the maximum stripe size, in bytes. @@ -647,14 +672,19 @@ class orc_writer_options_builder { } /** - * @brief Enable/Disable writing column statistics. + * @brief Choose granularity of column statistics to be written + * + * The granularity can be set to: + * - cudf::io::STATISTICS_NONE: No statistics are collected. + * - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe. + * - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group. * - * @param val Boolean value to enable/disable. + * @param val Level of statistics collection. * @return this for chaining. */ - orc_writer_options_builder& enable_statistics(bool val) + orc_writer_options_builder& enable_statistics(statistics_freq val) { - options._enable_statistics = val; + options._stats_freq = val; return *this; } @@ -775,8 +805,8 @@ class chunked_orc_writer_options { sink_info _sink; // Specify the compression format to use compression_type _compression = compression_type::AUTO; - // Enable writing column statistics - bool _enable_statistics = true; + // Specify granularity of statistics collection + statistics_freq _stats_freq = ORC_STATISTICS_ROW_GROUP; // Maximum size of each stripe (unless smaller than a single row group) size_t _stripe_size_bytes = default_stripe_size_bytes; // Maximum number of rows in stripe (unless smaller than a single row group) @@ -825,9 +855,9 @@ class chunked_orc_writer_options { [[nodiscard]] compression_type get_compression() const { return _compression; } /** - * @brief Whether writing column statistics is enabled/disabled. + * @brief Returns granularity of statistics collection. */ - [[nodiscard]] bool is_enabled_statistics() const { return _enable_statistics; } + [[nodiscard]] statistics_freq get_statistics_freq() const { return _stats_freq; } /** * @brief Returns maximum stripe size, in bytes. @@ -871,11 +901,16 @@ class chunked_orc_writer_options { void set_compression(compression_type comp) { _compression = comp; } /** - * @brief Enable/Disable writing column statistics. + * @brief Choose granularity of statistics collection + * + * The granularity can be set to: + * - cudf::io::STATISTICS_NONE: No statistics are collected. + * - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe. + * - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group. * - * @param val Boolean value to enable/disable. + * @param val Frequency of statistics collection. */ - void enable_statistics(bool val) { _enable_statistics = val; } + void enable_statistics(statistics_freq val) { _stats_freq = val; } /** * @brief Sets the maximum stripe size, in bytes. @@ -958,14 +993,19 @@ class chunked_orc_writer_options_builder { } /** - * @brief Enable/Disable writing column statistics. + * @brief Choose granularity of statistics collection + * + * The granularity can be set to: + * - cudf::io::STATISTICS_NONE: No statistics are collected. + * - cudf::io::ORC_STATISTICS_STRIPE: Statistics are collected for each ORC stripe. + * - cudf::io::ORC_STATISTICS_ROWGROUP: Statistics are collected for each ORC row group. * - * @param val Boolean value to enable/disable. + * @param val Frequency of statistics collection. * @return this for chaining. */ - chunked_orc_writer_options_builder& enable_statistics(bool val) + chunked_orc_writer_options_builder& enable_statistics(statistics_freq val) { - options._enable_statistics = val; + options._stats_freq = val; return *this; } diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 105c473c15e..a917dbf93a5 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1063,15 +1063,15 @@ void set_stat_desc_leaf_cols(device_span columns, } writer::impl::encoded_statistics writer::impl::gather_statistic_blobs( - bool are_statistics_enabled, + statistics_freq stats_freq, orc_table_view const& orc_table, file_segmentation const& segmentation) { - auto const num_rowgroup_blobs = segmentation.rowgroups.count(); - auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns(); - auto const num_file_blobs = orc_table.num_columns(); - auto const num_stat_blobs = num_rowgroup_blobs + num_stripe_blobs + num_file_blobs; - + auto const num_rowgroup_blobs = segmentation.rowgroups.count(); + auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns(); + auto const num_file_blobs = orc_table.num_columns(); + auto const num_stat_blobs = num_rowgroup_blobs + num_stripe_blobs + num_file_blobs; + auto const are_statistics_enabled = stats_freq != statistics_freq::STATISTICS_NONE; if (not are_statistics_enabled or num_stat_blobs == 0) { return {}; } hostdevice_vector stat_desc(orc_table.num_columns(), stream); @@ -1164,17 +1164,27 @@ writer::impl::encoded_statistics writer::impl::gather_statistic_blobs( hostdevice_vector blobs( stat_merge[num_stat_blobs - 1].start_chunk + stat_merge[num_stat_blobs - 1].num_chunks, stream); - gpu::orc_encode_statistics( - blobs.device_ptr(), stat_merge.device_ptr(), stat_chunks.data(), num_stat_blobs, stream); + // Skip rowgroup blobs when encoding, if chosen granularity is coarser than "ROW_GROUP". + auto const is_granularity_rowgroup = stats_freq == ORC_STATISTICS_ROW_GROUP; + auto const num_skip = is_granularity_rowgroup ? 0 : num_rowgroup_blobs; + gpu::orc_encode_statistics(blobs.device_ptr(), + stat_merge.device_ptr(num_skip), + stat_chunks.data() + num_skip, + num_stat_blobs - num_skip, + stream); stat_merge.device_to_host(stream); blobs.device_to_host(stream, true); - std::vector rowgroup_blobs(num_rowgroup_blobs); - for (size_t i = 0; i < num_rowgroup_blobs; i++) { - auto const stat_begin = blobs.host_ptr(rowgroup_stat_merge[i].start_chunk); - auto const stat_end = stat_begin + rowgroup_stat_merge[i].num_chunks; - rowgroup_blobs[i].assign(stat_begin, stat_end); - } + auto rowgroup_blobs = [&]() -> std::vector { + if (not is_granularity_rowgroup) { return {}; } + std::vector rowgroup_blobs(num_rowgroup_blobs); + for (size_t i = 0; i < num_rowgroup_blobs; i++) { + auto const stat_begin = blobs.host_ptr(rowgroup_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + rowgroup_stat_merge[i].num_chunks; + rowgroup_blobs[i].assign(stat_begin, stat_end); + } + return rowgroup_blobs; + }(); std::vector stripe_blobs(num_stripe_blobs); for (size_t i = 0; i < num_stripe_blobs; i++) { @@ -1351,7 +1361,7 @@ writer::impl::impl(std::unique_ptr sink, max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, row_index_stride{options.get_row_index_stride()}, compression_kind_(to_orc_compression(options.get_compression())), - enable_statistics_(options.is_enabled_statistics()), + stats_freq_(options.get_statistics_freq()), single_write_mode(mode == SingleWriteMode::YES), kv_meta(options.get_key_value_metadata()), out_sink_(std::move(sink)) @@ -1372,7 +1382,7 @@ writer::impl::impl(std::unique_ptr sink, max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, row_index_stride{options.get_row_index_stride()}, compression_kind_(to_orc_compression(options.get_compression())), - enable_statistics_(options.is_enabled_statistics()), + stats_freq_(options.get_statistics_freq()), single_write_mode(mode == SingleWriteMode::YES), kv_meta(options.get_key_value_metadata()), out_sink_(std::move(sink)) @@ -1954,7 +1964,7 @@ void writer::impl::write(table_view const& table) ProtobufWriter pbw_(&buffer_); - auto const statistics = gather_statistic_blobs(enable_statistics_, orc_table, segmentation); + auto const statistics = gather_statistic_blobs(stats_freq_, orc_table, segmentation); // Write stripes std::vector> write_tasks; diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 903ceaa1714..69bb6029ee0 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -293,13 +293,13 @@ class writer::impl { /** * @brief Returns column statistics encoded in ORC protobuf format. * - * @param are_statistics_enabled True if statistics are to be included in the output file + * @param statistics_freq Frequency of statistics to be included in the output file * @param orc_table Table information to be written * @param columns List of columns * @param segmentation stripe and rowgroup ranges * @return The statistic blobs */ - encoded_statistics gather_statistic_blobs(bool are_statistics_enabled, + encoded_statistics gather_statistic_blobs(statistics_freq statistics_freq, orc_table_view const& orc_table, file_segmentation const& segmentation); @@ -365,8 +365,8 @@ class writer::impl { size_t compression_blocksize_ = DEFAULT_COMPRESSION_BLOCKSIZE; CompressionKind compression_kind_ = CompressionKind::NONE; - bool enable_dictionary_ = true; - bool enable_statistics_ = true; + bool enable_dictionary_ = true; + statistics_freq stats_freq_ = ORC_STATISTICS_ROW_GROUP; // Overall file metadata. Filled in during the process and written during write_chunked_end() cudf::io::orc::FileFooter ff; diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index 03faf9be021..22b089fa93a 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -1733,7 +1733,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCBufferBegin( chunked_orc_writer_options opts = chunked_orc_writer_options::builder(sink) .metadata(&metadata) .compression(static_cast(j_compression)) - .enable_statistics(true) + .enable_statistics(ORC_STATISTICS_ROW_GROUP) .key_value_metadata(kv_metadata) .build(); auto writer_ptr = std::make_unique(opts); @@ -1776,7 +1776,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCFileBegin( chunked_orc_writer_options opts = chunked_orc_writer_options::builder(sink) .metadata(&metadata) .compression(static_cast(j_compression)) - .enable_statistics(true) + .enable_statistics(ORC_STATISTICS_ROW_GROUP) .key_value_metadata(kv_metadata) .build(); auto writer_ptr = std::make_unique(opts); diff --git a/python/cudf/cudf/_fuzz_testing/tests/fuzz_test_orc.py b/python/cudf/cudf/_fuzz_testing/tests/fuzz_test_orc.py index b3fd7e8c5a7..977038d1fcb 100644 --- a/python/cudf/cudf/_fuzz_testing/tests/fuzz_test_orc.py +++ b/python/cudf/cudf/_fuzz_testing/tests/fuzz_test_orc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. import io import sys @@ -74,7 +74,7 @@ def orc_reader_stripes_test(input_tuple, columns, stripes): data_handle=OrcWriter, params={ "compression": [None, "snappy"], - "enable_statistics": [True, False], + "enable_statistics": ["NONE", "STRIPE", "ROWGROUP"], }, ) def orc_writer_test(pdf, compression, enable_statistics): diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index cbba1796c26..ce4f183e795 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -9,6 +9,7 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +cimport cudf._lib.cpp.io.types as cudf_io_types from cudf._lib.column cimport Column from cudf._lib.cpp.column.column cimport column from cudf._lib.cpp.io.orc cimport ( @@ -144,10 +145,27 @@ cdef compression_type _get_comp_type(object compression): raise ValueError(f"Unsupported `compression` type {compression}") +cdef cudf_io_types.statistics_freq _get_orc_stat_freq(object statistics): + """ + Convert ORC statistics terms to CUDF convention: + - ORC "STRIPE" == CUDF "ROWGROUP" + - ORC "ROWGROUP" == CUDF "PAGE" + """ + statistics = str(statistics).upper() + if statistics == "NONE": + return cudf_io_types.statistics_freq.STATISTICS_NONE + elif statistics == "STRIPE": + return cudf_io_types.statistics_freq.STATISTICS_ROWGROUP + elif statistics == "ROWGROUP": + return cudf_io_types.statistics_freq.STATISTICS_PAGE + else: + raise ValueError(f"Unsupported `statistics_freq` type {statistics}") + + cpdef write_orc(table, object path_or_buf, object compression=None, - bool enable_statistics=True, + object statistics="ROWGROUP", object stripe_size_bytes=None, object stripe_size_rows=None, object row_index_stride=None): @@ -189,7 +207,7 @@ cpdef write_orc(table, sink_info_c, table_view_from_table(table, ignore_index=True) ).metadata(tbl_meta.get()) .compression(compression_) - .enable_statistics( (True if enable_statistics else False)) + .enable_statistics(_get_orc_stat_freq(statistics)) .build() ) if stripe_size_bytes is not None: @@ -268,15 +286,15 @@ cdef class ORCWriter: cdef unique_ptr[orc_chunked_writer] writer cdef sink_info sink cdef unique_ptr[data_sink] _data_sink - cdef bool enable_stats + cdef cudf_io_types.statistics_freq stat_freq cdef compression_type comp_type cdef object index cdef unique_ptr[table_input_metadata] tbl_meta def __cinit__(self, object path, object index=None, - object compression=None, bool enable_statistics=True): + object compression=None, object statistics="ROWGROUP"): self.sink = make_sink_info(path, self._data_sink) - self.enable_stats = enable_statistics + self.stat_freq = _get_orc_stat_freq(statistics) self.comp_type = _get_comp_type(compression) self.index = index self.initialized = False @@ -350,7 +368,7 @@ cdef class ORCWriter: .metadata(self.tbl_meta.get()) .key_value_metadata(move(user_data)) .compression(self.comp_type) - .enable_statistics(self.enable_stats) + .enable_statistics(self.stat_freq) .build() ) self.writer.reset(new orc_chunked_writer(args)) diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index c1cce3f996f..5c35d004ac0 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. import datetime import warnings @@ -395,7 +395,7 @@ def to_orc( df, fname, compression=None, - enable_statistics=True, + statistics="ROWGROUP", stripe_size_bytes=None, stripe_size_rows=None, row_index_stride=None, @@ -431,7 +431,7 @@ def to_orc( df, file_obj, compression, - enable_statistics, + statistics, stripe_size_bytes, stripe_size_rows, row_index_stride, @@ -441,7 +441,7 @@ def to_orc( df, path_or_buf, compression, - enable_statistics, + statistics, stripe_size_bytes, stripe_size_rows, row_index_stride, diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 44812f5aba4..8689f773a02 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -391,6 +391,64 @@ def test_orc_writer(datadir, tmpdir, reference_file, columns, compression): assert_eq(expect, got) +@pytest.mark.parametrize("stats_freq", ["NONE", "STRIPE", "ROWGROUP"]) +def test_orc_writer_statistics_frequency(datadir, tmpdir, stats_freq): + reference_file = "TestOrcFile.demo-12-zlib.orc" + pdf_fname = datadir / reference_file + gdf_fname = tmpdir.join("gdf.orc") + + try: + orcfile = pa.orc.ORCFile(pdf_fname) + except Exception as excpr: + if type(excpr).__name__ == "ArrowIOError": + pytest.skip(".orc file is not found") + else: + print(type(excpr).__name__) + + expect = orcfile.read().to_pandas() + cudf.from_pandas(expect).to_orc(gdf_fname.strpath, statistics=stats_freq) + got = pa.orc.ORCFile(gdf_fname).read().to_pandas() + + assert_eq(expect, got) + + +@pytest.mark.parametrize("stats_freq", ["NONE", "STRIPE", "ROWGROUP"]) +def test_chunked_orc_writer_statistics_frequency(datadir, tmpdir, stats_freq): + reference_file = "TestOrcFile.test1.orc" + pdf_fname = datadir / reference_file + gdf_fname = tmpdir.join("chunked_gdf.orc") + + try: + orcfile = pa.orc.ORCFile(pdf_fname) + except Exception as excpr: + if type(excpr).__name__ == "ArrowIOError": + pytest.skip(".orc file is not found") + else: + print(type(excpr).__name__) + + columns = [ + "boolean1", + "byte1", + "short1", + "int1", + "long1", + "float1", + "double1", + ] + pdf = orcfile.read(columns=columns).to_pandas() + gdf = cudf.from_pandas(pdf) + expect = pd.concat([pdf, pdf]).reset_index(drop=True) + + writer = ORCWriter(gdf_fname, statistics=stats_freq) + writer.write_table(gdf) + writer.write_table(gdf) + writer.close() + + got = pa.orc.ORCFile(gdf_fname).read().to_pandas() + + assert_eq(expect, got) + + @pytest.mark.parametrize("compression", [None, "snappy"]) @pytest.mark.parametrize( "reference_file, columns", @@ -592,8 +650,9 @@ def normalized_equals(value1, value2): return value1 == value2 +@pytest.mark.parametrize("stats_freq", ["STRIPE", "ROWGROUP"]) @pytest.mark.parametrize("nrows", [1, 100, 6000000]) -def test_orc_write_statistics(tmpdir, datadir, nrows): +def test_orc_write_statistics(tmpdir, datadir, nrows, stats_freq): supported_stat_types = supported_numpy_dtypes + ["str"] # Can't write random bool columns until issue #6763 is fixed if nrows == 6000000: @@ -609,7 +668,7 @@ def test_orc_write_statistics(tmpdir, datadir, nrows): fname = tmpdir.join("gdf.orc") # Write said dataframe to ORC with cuDF - gdf.to_orc(fname.strpath) + gdf.to_orc(fname.strpath, statistics=stats_freq) # Read back written ORC's statistics orc_file = pa.orc.ORCFile(fname)