diff --git a/cpp/include/cudf/io/detail/orc.hpp b/cpp/include/cudf/io/detail/orc.hpp index 4c78502a21b..1a53690e317 100644 --- a/cpp/include/cudf/io/detail/orc.hpp +++ b/cpp/include/cudf/io/detail/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -93,13 +93,11 @@ class writer { * @param options Settings for controlling writing behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation */ explicit writer(std::unique_ptr sink, orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); + rmm::cuda_stream_view stream); /** * @brief Constructor with chunked writer options. @@ -108,13 +106,11 @@ class writer { * @param options Settings for controlling writing behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation */ explicit writer(std::unique_ptr sink, chunked_orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); + rmm::cuda_stream_view stream); /** * @brief Destructor explicitly declared to avoid inlining in header diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index b1e2197a868..9dac915118f 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -781,10 +781,8 @@ class orc_writer_options_builder { * @endcode * * @param options Settings for controlling reading behavior - * @param mr Device memory resource to use for device memory allocation */ -void write_orc(orc_writer_options const& options, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +void write_orc(orc_writer_options const& options); /** * @brief Builds settings to use for `write_orc_chunked()`. @@ -1137,10 +1135,8 @@ class orc_chunked_writer { * @brief Constructor with chunked writer options * * @param[in] options options used to write table - * @param[in] mr Device memory resource to use for device memory allocation */ - orc_chunked_writer(chunked_orc_writer_options const& options, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + orc_chunked_writer(chunked_orc_writer_options const& options); /** * @brief Writes table to output. diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index b8a4d8a8388..088aef5175e 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -418,7 +418,7 @@ table_with_metadata read_orc(orc_reader_options const& options, rmm::mr::device_ /** * @copydoc cudf::io::write_orc */ -void write_orc(orc_writer_options const& options, rmm::mr::device_memory_resource* mr) +void write_orc(orc_writer_options const& options) { namespace io_detail = cudf::io::detail; @@ -428,7 +428,7 @@ void write_orc(orc_writer_options const& options, rmm::mr::device_memory_resourc CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing"); auto writer = std::make_unique( - std::move(sinks[0]), options, io_detail::SingleWriteMode::YES, cudf::get_default_stream(), mr); + std::move(sinks[0]), options, io_detail::SingleWriteMode::YES, cudf::get_default_stream()); writer->write(options.get_table()); } @@ -436,8 +436,7 @@ void write_orc(orc_writer_options const& options, rmm::mr::device_memory_resourc /** * @copydoc cudf::io::orc_chunked_writer::orc_chunked_writer */ -orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options, - rmm::mr::device_memory_resource* mr) +orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options) { namespace io_detail = cudf::io::detail; @@ -445,7 +444,7 @@ orc_chunked_writer::orc_chunked_writer(chunked_orc_writer_options const& options CUDF_EXPECTS(sinks.size() == 1, "Multiple sinks not supported for ORC writing"); writer = std::make_unique( - std::move(sinks[0]), options, io_detail::SingleWriteMode::NO, cudf::get_default_stream(), mr); + std::move(sinks[0]), options, io_detail::SingleWriteMode::NO, cudf::get_default_stream()); } /** diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 90da3c8fffc..f1eb52f63f1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2392,21 +2392,19 @@ convert_table_to_orc_data(table_view const& input, writer::impl::impl(std::unique_ptr sink, orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _mr(mr), - stream(stream), - max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, - row_index_stride{options.get_row_index_stride()}, - compression_kind_(to_orc_compression(options.get_compression())), - compression_blocksize_(compression_block_size(compression_kind_)), - stats_freq_(options.get_statistics_freq()), - single_write_mode(mode == SingleWriteMode::YES), - kv_meta(options.get_key_value_metadata()), - out_sink_(std::move(sink)) + rmm::cuda_stream_view stream) + : _stream(stream), + _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + _row_index_stride{options.get_row_index_stride()}, + _compression_kind(to_orc_compression(options.get_compression())), + _compression_blocksize(compression_block_size(_compression_kind)), + _stats_freq(options.get_statistics_freq()), + _single_write_mode(mode == SingleWriteMode::YES), + _kv_meta(options.get_key_value_metadata()), + _out_sink(std::move(sink)) { if (options.get_metadata()) { - table_meta = std::make_unique(*options.get_metadata()); + _table_meta = std::make_unique(*options.get_metadata()); } init_state(); } @@ -2414,21 +2412,19 @@ writer::impl::impl(std::unique_ptr sink, writer::impl::impl(std::unique_ptr sink, chunked_orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _mr(mr), - stream(stream), - max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, - row_index_stride{options.get_row_index_stride()}, - compression_kind_(to_orc_compression(options.get_compression())), - compression_blocksize_(compression_block_size(compression_kind_)), - stats_freq_(options.get_statistics_freq()), - single_write_mode(mode == SingleWriteMode::YES), - kv_meta(options.get_key_value_metadata()), - out_sink_(std::move(sink)) + rmm::cuda_stream_view stream) + : _stream(stream), + _max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + _row_index_stride{options.get_row_index_stride()}, + _compression_kind(to_orc_compression(options.get_compression())), + _compression_blocksize(compression_block_size(_compression_kind)), + _stats_freq(options.get_statistics_freq()), + _single_write_mode(mode == SingleWriteMode::YES), + _kv_meta(options.get_key_value_metadata()), + _out_sink(std::move(sink)) { if (options.get_metadata()) { - table_meta = std::make_unique(*options.get_metadata()); + _table_meta = std::make_unique(*options.get_metadata()); } init_state(); } @@ -2438,14 +2434,14 @@ writer::impl::~impl() { close(); } void writer::impl::init_state() { // Write file header - out_sink_->host_write(MAGIC, std::strlen(MAGIC)); + _out_sink->host_write(MAGIC, std::strlen(MAGIC)); } void writer::impl::write(table_view const& input) { - CUDF_EXPECTS(not closed, "Data has already been flushed to out and closed"); + CUDF_EXPECTS(not _closed, "Data has already been flushed to out and closed"); - if (not table_meta) { table_meta = make_table_meta(input); } + if (not _table_meta) { _table_meta = make_table_meta(input); } // All kinds of memory allocation and data compressions/encoding are performed here. // If any error occurs, such as out-of-memory exception, the internal state of the current writer @@ -2465,16 +2461,16 @@ void writer::impl::write(table_view const& input) stream_output] = [&] { try { return convert_table_to_orc_data(input, - *table_meta, - max_stripe_size, - row_index_stride, - enable_dictionary_, - compression_kind_, - compression_blocksize_, - stats_freq_, - single_write_mode, - *out_sink_, - stream); + *_table_meta, + _max_stripe_size, + _row_index_stride, + _enable_dictionary, + _compression_kind, + _compression_blocksize, + _stats_freq, + _single_write_mode, + *_out_sink, + _stream); } catch (...) { // catch any exception type CUDF_LOG_ERROR( "ORC writer encountered exception during processing. " @@ -2513,8 +2509,8 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, if (orc_table.num_rows() == 0) { return; } if (intermediate_stats.stripe_stat_chunks.size() > 0) { - persisted_stripe_statistics.persist( - orc_table.num_rows(), single_write_mode, intermediate_stats, stream); + _persisted_stripe_statistics.persist( + orc_table.num_rows(), _single_write_mode, intermediate_stats, _stream); } // Write stripes @@ -2522,7 +2518,7 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { auto& stripe = stripes[stripe_id]; - stripe.offset = out_sink_->bytes_written(); + stripe.offset = _out_sink->bytes_written(); // Column (skippable) index streams appear at the start of the stripe size_type const num_index_streams = (orc_table.num_columns() + 1); @@ -2537,9 +2533,9 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, intermediate_stats.rowgroup_blobs, &stripe, &streams, - compression_kind_, - compression_blocksize_, - out_sink_); + _compression_kind, + _compression_blocksize, + _out_sink); } // Column data consisting one or more separate streams @@ -2551,9 +2547,9 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, stream_output, &stripe, &streams, - compression_kind_, - out_sink_, - stream)); + _compression_kind, + _out_sink, + _stream)); } // Write stripefooter consisting of stream information @@ -2569,16 +2565,16 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); pbw.write(sf); stripe.footerLength = pbw.size(); - if (compression_kind_ != NONE) { + if (_compression_kind != NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); pbw.buffer()[2] = static_cast(uncomp_sf_len >> 16); } - out_sink_->host_write(pbw.data(), pbw.size()); + _out_sink->host_write(pbw.data(), pbw.size()); } for (auto const& task : write_tasks) { task.wait(); @@ -2588,20 +2584,20 @@ void writer::impl::write_orc_data_to_sink(orc_streams& streams, void writer::impl::add_table_to_footer_data(orc_table_view const& orc_table, std::vector& stripes) { - if (ff.headerLength == 0) { + if (_ffooter.headerLength == 0) { // First call - ff.headerLength = std::strlen(MAGIC); - ff.rowIndexStride = row_index_stride; - ff.types.resize(1 + orc_table.num_columns()); - ff.types[0].kind = STRUCT; + _ffooter.headerLength = std::strlen(MAGIC); + _ffooter.rowIndexStride = _row_index_stride; + _ffooter.types.resize(1 + orc_table.num_columns()); + _ffooter.types[0].kind = STRUCT; for (auto const& column : orc_table.columns) { if (!column.is_child()) { - ff.types[0].subtypes.emplace_back(column.id()); - ff.types[0].fieldNames.emplace_back(column.orc_name()); + _ffooter.types[0].subtypes.emplace_back(column.id()); + _ffooter.types[0].fieldNames.emplace_back(column.orc_name()); } } for (auto const& column : orc_table.columns) { - auto& schema_type = ff.types[column.id()]; + auto& schema_type = _ffooter.types[column.id()]; schema_type.kind = column.orc_kind(); if (column.orc_kind() == DECIMAL) { schema_type.scale = static_cast(column.scale()); @@ -2622,101 +2618,103 @@ void writer::impl::add_table_to_footer_data(orc_table_view const& orc_table, } } else { // verify the user isn't passing mismatched tables - CUDF_EXPECTS(ff.types.size() == 1 + orc_table.num_columns(), + CUDF_EXPECTS(_ffooter.types.size() == 1 + orc_table.num_columns(), "Mismatch in table structure between multiple calls to write"); CUDF_EXPECTS( std::all_of(orc_table.columns.cbegin(), orc_table.columns.cend(), - [&](auto const& col) { return ff.types[col.id()].kind == col.orc_kind(); }), + [&](auto const& col) { return _ffooter.types[col.id()].kind == col.orc_kind(); }), "Mismatch in column types between multiple calls to write"); } - ff.stripes.insert(ff.stripes.end(), - std::make_move_iterator(stripes.begin()), - std::make_move_iterator(stripes.end())); - ff.numberOfRows += orc_table.num_rows(); + _ffooter.stripes.insert(_ffooter.stripes.end(), + std::make_move_iterator(stripes.begin()), + std::make_move_iterator(stripes.end())); + _ffooter.numberOfRows += orc_table.num_rows(); } void writer::impl::close() { - if (closed) { return; } - closed = true; + if (_closed) { return; } + _closed = true; PostScript ps; auto const statistics = - finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics, stream); + finish_statistic_blobs(_ffooter.stripes.size(), _persisted_stripe_statistics, _stream); // File-level statistics if (not statistics.file_level.empty()) { ProtobufWriter pbw; pbw.put_uint(encode_field_number(1)); - pbw.put_uint(persisted_stripe_statistics.num_rows); + pbw.put_uint(_persisted_stripe_statistics.num_rows); // First entry contains total number of rows - ff.statistics.reserve(ff.types.size()); - ff.statistics.emplace_back(pbw.release()); + _ffooter.statistics.reserve(_ffooter.types.size()); + _ffooter.statistics.emplace_back(pbw.release()); // Add file stats, stored after stripe stats in `column_stats` - ff.statistics.insert(ff.statistics.end(), - std::make_move_iterator(statistics.file_level.begin()), - std::make_move_iterator(statistics.file_level.end())); + _ffooter.statistics.insert(_ffooter.statistics.end(), + std::make_move_iterator(statistics.file_level.begin()), + std::make_move_iterator(statistics.file_level.end())); } // Stripe-level statistics if (not statistics.stripe_level.empty()) { - md.stripeStats.resize(ff.stripes.size()); - for (size_t stripe_id = 0; stripe_id < ff.stripes.size(); stripe_id++) { - md.stripeStats[stripe_id].colStats.resize(ff.types.size()); + _orc_meta.stripeStats.resize(_ffooter.stripes.size()); + for (size_t stripe_id = 0; stripe_id < _ffooter.stripes.size(); stripe_id++) { + _orc_meta.stripeStats[stripe_id].colStats.resize(_ffooter.types.size()); ProtobufWriter pbw; pbw.put_uint(encode_field_number(1)); - pbw.put_uint(ff.stripes[stripe_id].numberOfRows); - md.stripeStats[stripe_id].colStats[0] = pbw.release(); - for (size_t col_idx = 0; col_idx < ff.types.size() - 1; col_idx++) { - size_t idx = ff.stripes.size() * col_idx + stripe_id; - md.stripeStats[stripe_id].colStats[1 + col_idx] = std::move(statistics.stripe_level[idx]); + pbw.put_uint(_ffooter.stripes[stripe_id].numberOfRows); + _orc_meta.stripeStats[stripe_id].colStats[0] = pbw.release(); + for (size_t col_idx = 0; col_idx < _ffooter.types.size() - 1; col_idx++) { + size_t idx = _ffooter.stripes.size() * col_idx + stripe_id; + _orc_meta.stripeStats[stripe_id].colStats[1 + col_idx] = + std::move(statistics.stripe_level[idx]); } } } - persisted_stripe_statistics.clear(); + _persisted_stripe_statistics.clear(); - ff.contentLength = out_sink_->bytes_written(); - std::transform( - kv_meta.begin(), kv_meta.end(), std::back_inserter(ff.metadata), [&](auto const& udata) { - return UserMetadataItem{udata.first, udata.second}; - }); + _ffooter.contentLength = _out_sink->bytes_written(); + std::transform(_kv_meta.begin(), + _kv_meta.end(), + std::back_inserter(_ffooter.metadata), + [&](auto const& udata) { + return UserMetadataItem{udata.first, udata.second}; + }); // Write statistics metadata - if (md.stripeStats.size() != 0) { - ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); - pbw.write(md); - add_uncompressed_block_headers(compression_kind_, compression_blocksize_, pbw.buffer()); + if (_orc_meta.stripeStats.size() != 0) { + ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + pbw.write(_orc_meta); + add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); ps.metadataLength = pbw.size(); - out_sink_->host_write(pbw.data(), pbw.size()); + _out_sink->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } - ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); - pbw.write(ff); - add_uncompressed_block_headers(compression_kind_, compression_blocksize_, pbw.buffer()); + ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); + pbw.write(_ffooter); + add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); // Write postscript metadata ps.footerLength = pbw.size(); - ps.compression = compression_kind_; - ps.compressionBlockSize = compression_blocksize_; + ps.compression = _compression_kind; + ps.compressionBlockSize = _compression_blocksize; ps.version = {0, 12}; ps.magic = MAGIC; const auto ps_length = static_cast(pbw.write(ps)); pbw.put_byte(ps_length); - out_sink_->host_write(pbw.data(), pbw.size()); - out_sink_->flush(); + _out_sink->host_write(pbw.data(), pbw.size()); + _out_sink->flush(); } // Forward to implementation writer::writer(std::unique_ptr sink, orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _impl(std::make_unique(std::move(sink), options, mode, stream, mr)) + rmm::cuda_stream_view stream) + : _impl(std::make_unique(std::move(sink), options, mode, stream)) { } @@ -2724,9 +2722,8 @@ writer::writer(std::unique_ptr sink, writer::writer(std::unique_ptr sink, chunked_orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) - : _impl(std::make_unique(std::move(sink), options, mode, stream, mr)) + rmm::cuda_stream_view stream) + : _impl(std::make_unique(std::move(sink), options, mode, stream)) { } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index d15b8674ba9..1dd3b3ab280 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -245,13 +245,11 @@ class writer::impl { * @param options Settings for controlling behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation */ explicit impl(std::unique_ptr sink, orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); + rmm::cuda_stream_view stream); /** * @brief Constructor with chunked writer options. @@ -260,13 +258,11 @@ class writer::impl { * @param options Settings for controlling behavior * @param mode Option to write at once or in chunks * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource to use for device memory allocation */ explicit impl(std::unique_ptr sink, chunked_orc_writer_options const& options, SingleWriteMode mode, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); + rmm::cuda_stream_view stream); /** * @brief Destructor to complete any incomplete write and release resources. @@ -329,36 +325,30 @@ class writer::impl { std::vector& stripes); private: - rmm::mr::device_memory_resource* _mr = nullptr; - // Cuda stream to be used - rmm::cuda_stream_view stream; - - stripe_size_limits max_stripe_size; - size_type row_index_stride; - CompressionKind compression_kind_; - size_t compression_blocksize_; - - bool enable_dictionary_ = true; - statistics_freq stats_freq_ = ORC_STATISTICS_ROW_GROUP; - - // Overall file metadata. Filled in during the process and written during write_chunked_end() - cudf::io::orc::FileFooter ff; - cudf::io::orc::Metadata md; - // current write position for rowgroups/chunks - size_t current_chunk_offset; - // special parameter only used by detail::write() to indicate that we are guaranteeing - // a single table write. this enables some internal optimizations. - bool const single_write_mode; - // optional user metadata - std::unique_ptr table_meta; - // optional user metadata - std::map kv_meta; - // to track if the output has been written to sink - bool closed = false; - // statistics data saved between calls to write before a close writes out the statistics - persisted_statistics persisted_stripe_statistics; - - std::unique_ptr out_sink_; + // CUDA stream. + rmm::cuda_stream_view const _stream; + + // Writer options. + stripe_size_limits const _max_stripe_size; + size_type const _row_index_stride; + CompressionKind const _compression_kind; + size_t const _compression_blocksize; + statistics_freq const _stats_freq; + bool const _single_write_mode; // Special parameter only used by `write()` to indicate that + // we are guaranteeing a single table write. This enables some + // internal optimizations. + std::map const _kv_meta; // Optional user metadata. + std::unique_ptr const _out_sink; + + // Debug parameter---currently not yet supported to be user-specified. + static bool constexpr _enable_dictionary = true; + + // Internal states, filled during `write()` and written to sink during `write` and `close()`. + std::unique_ptr _table_meta; + cudf::io::orc::FileFooter _ffooter; + cudf::io::orc::Metadata _orc_meta; + persisted_statistics _persisted_stripe_statistics; // Statistics data saved between calls. + bool _closed = false; // To track if the output has been written to sink. }; } // namespace orc