diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 880990c552f..5445e59297c 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -28,7 +28,7 @@ namespace cudf { namespace io { namespace orc { -uint32_t ProtobufReader::read_field_size(const uint8_t* end) +uint32_t ProtobufReader::read_field_size(uint8_t const* end) { auto const size = get(); CUDF_EXPECTS(size <= static_cast(end - m_cur), "Protobuf parsing out of bounds"); @@ -213,8 +213,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, TypeKind kind, ColStatsBlob const* stats) { - std::vector positions_data; - ProtobufWriter position_writer(&positions_data); + ProtobufWriter position_writer; auto const positions_size_offset = position_writer.put_uint( encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true] position_writer.put_byte(0xcd); // positions size placeholder @@ -246,19 +245,20 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, positions_size += position_writer.put_byte(0); } } + // size of the field 1 - positions_data[positions_size_offset] = static_cast(positions_size); + position_writer.buffer()[positions_size_offset] = static_cast(positions_size); auto const stats_size = (stats == nullptr) ? 0 : varint_size(encode_field_number(2)) + varint_size(stats->size()) + stats->size(); - auto const entry_size = positions_data.size() + stats_size; + auto const entry_size = position_writer.size() + stats_size; // 1:RowIndex.entry put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); put_uint(entry_size); - put_bytes(positions_data); + put_bytes(position_writer.buffer()); if (stats != nullptr) { put_uint(encode_field_number(2)); // 2: statistics @@ -268,7 +268,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, } } -size_t ProtobufWriter::write(const PostScript& s) +size_t ProtobufWriter::write(PostScript const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.footerLength); @@ -280,7 +280,7 @@ size_t ProtobufWriter::write(const PostScript& s) return w.value(); } -size_t ProtobufWriter::write(const FileFooter& s) +size_t ProtobufWriter::write(FileFooter const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.headerLength); @@ -294,7 +294,7 @@ size_t ProtobufWriter::write(const FileFooter& s) return w.value(); } -size_t ProtobufWriter::write(const StripeInformation& s) +size_t ProtobufWriter::write(StripeInformation const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.offset); @@ -305,7 +305,7 @@ size_t ProtobufWriter::write(const StripeInformation& s) return w.value(); } -size_t ProtobufWriter::write(const SchemaType& s) +size_t ProtobufWriter::write(SchemaType const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.kind); @@ -317,7 +317,7 @@ size_t ProtobufWriter::write(const SchemaType& s) return w.value(); } -size_t ProtobufWriter::write(const UserMetadataItem& s) +size_t ProtobufWriter::write(UserMetadataItem const& s) { ProtobufFieldWriter w(this); w.field_blob(1, s.name); @@ -325,7 +325,7 @@ size_t ProtobufWriter::write(const UserMetadataItem& s) return w.value(); } -size_t ProtobufWriter::write(const StripeFooter& s) +size_t ProtobufWriter::write(StripeFooter const& s) { ProtobufFieldWriter w(this); w.field_repeated_struct(1, s.streams); @@ -334,7 +334,7 @@ size_t ProtobufWriter::write(const StripeFooter& s) return w.value(); } -size_t ProtobufWriter::write(const Stream& s) +size_t ProtobufWriter::write(Stream const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.kind); @@ -343,7 +343,7 @@ size_t ProtobufWriter::write(const Stream& s) return w.value(); } -size_t ProtobufWriter::write(const ColumnEncoding& s) +size_t ProtobufWriter::write(ColumnEncoding const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.kind); @@ -351,14 +351,14 @@ size_t ProtobufWriter::write(const ColumnEncoding& s) return w.value(); } -size_t ProtobufWriter::write(const StripeStatistics& s) +size_t ProtobufWriter::write(StripeStatistics const& s) { ProtobufFieldWriter w(this); w.field_repeated_struct_blob(1, s.colStats); return w.value(); } -size_t ProtobufWriter::write(const Metadata& s) +size_t ProtobufWriter::write(Metadata const& s) { ProtobufFieldWriter w(this); w.field_repeated_struct(1, s.stripeStats); @@ -443,13 +443,13 @@ host_span OrcDecompressor::decompress_blocks(host_spansize(); - const auto max_ps_size = std::min(len, static_cast(256)); + auto const len = source->size(); + auto const max_ps_size = std::min(len, static_cast(256)); // Read uncompressed postscript section (max 255 bytes + 1 byte for length) auto buffer = source->host_read(len - max_ps_size, max_ps_size); - const size_t ps_length = buffer->data()[max_ps_size - 1]; - const uint8_t* ps_data = &buffer->data()[max_ps_size - ps_length - 1]; + size_t const ps_length = buffer->data()[max_ps_size - 1]; + uint8_t const* ps_data = &buffer->data()[max_ps_size - ps_length - 1]; ProtobufReader(ps_data, ps_length).read(ps); CUDF_EXPECTS(ps.footerLength + ps_length < len, "Invalid footer length"); diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 44882b71925..d30c3823080 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -196,7 +196,7 @@ int constexpr encode_field_number(int field_number) noexcept */ class ProtobufReader { public: - ProtobufReader(const uint8_t* base, size_t len) : m_base(base), m_cur(base), m_end(base + len) {} + ProtobufReader(uint8_t const* base, size_t len) : m_base(base), m_cur(base), m_end(base + len) {} template void read(T& s) @@ -241,40 +241,40 @@ class ProtobufReader { template void function_builder(T& s, size_t maxlen, std::tuple& op); - uint32_t read_field_size(const uint8_t* end); + uint32_t read_field_size(uint8_t const* end); template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { value = get(); } template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { value = static_cast(get()); } template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); - value.assign(reinterpret_cast(m_cur), size); + value.assign(reinterpret_cast(m_cur), size); m_cur += size; } template >>* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); - value.emplace_back(reinterpret_cast(m_cur), size); + value.emplace_back(reinterpret_cast(m_cur), size); m_cur += size; } template > and !std::is_same_v>* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); value.emplace_back(); @@ -283,7 +283,7 @@ class ProtobufReader { template >>* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { typename T::value_type contained_value; read_field(contained_value, end); @@ -291,21 +291,21 @@ class ProtobufReader { } template - auto read_field(T& value, const uint8_t* end) -> decltype(read(value, 0)) + auto read_field(T& value, uint8_t const* end) -> decltype(read(value, 0)) { auto const size = read_field_size(end); read(value, size); } template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { memcpy(&value, m_cur, sizeof(T)); m_cur += sizeof(T); } template - void read_packed_field(T& value, const uint8_t* end) + void read_packed_field(T& value, uint8_t const* end) { auto const len = get(); auto const field_end = std::min(m_cur + len, end); @@ -314,7 +314,7 @@ class ProtobufReader { } template - void read_raw_field(T& value, const uint8_t* end) + void read_raw_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); value.emplace_back(m_cur, m_cur + size); @@ -331,7 +331,7 @@ class ProtobufReader { { } - inline void operator()(ProtobufReader* pbr, const uint8_t* end) + inline void operator()(ProtobufReader* pbr, uint8_t const* end) { pbr->read_field(output_value, end); } @@ -347,7 +347,7 @@ class ProtobufReader { { } - inline void operator()(ProtobufReader* pbr, const uint8_t* end) + inline void operator()(ProtobufReader* pbr, uint8_t const* end) { pbr->read_packed_field(output_value, end); } @@ -363,15 +363,15 @@ class ProtobufReader { { } - inline void operator()(ProtobufReader* pbr, const uint8_t* end) + inline void operator()(ProtobufReader* pbr, uint8_t const* end) { pbr->read_raw_field(output_value, end); } }; - const uint8_t* const m_base; - const uint8_t* m_cur; - const uint8_t* const m_end; + uint8_t const* const m_base; + uint8_t const* m_cur; + uint8_t const* const m_end; public: /** @@ -477,21 +477,25 @@ inline int64_t ProtobufReader::get() */ class ProtobufWriter { public: - ProtobufWriter() { m_buf = nullptr; } - ProtobufWriter(std::vector* output) { m_buf = output; } + ProtobufWriter() = default; + + ProtobufWriter(std::size_t bytes) : m_buff(bytes) {} + uint32_t put_byte(uint8_t v) { - m_buf->push_back(v); + m_buff.push_back(v); return 1; } + template uint32_t put_bytes(host_span values) { static_assert(sizeof(T) == 1); - m_buf->reserve(m_buf->size() + values.size()); - m_buf->insert(m_buf->end(), values.begin(), values.end()); + m_buff.reserve(m_buff.size() + values.size()); + m_buff.insert(m_buff.end(), values.begin(), values.end()); return values.size(); } + uint32_t put_uint(uint64_t v) { int l = 1; @@ -519,6 +523,7 @@ class ProtobufWriter { int64_t s = (v < 0); return put_uint(((v ^ -s) << 1) + s); } + void put_row_index_entry(int32_t present_blk, int32_t present_ofs, int32_t data_blk, @@ -528,20 +533,26 @@ class ProtobufWriter { TypeKind kind, ColStatsBlob const* stats); + std::size_t size() const { return m_buff.size(); } + uint8_t const* data() { return m_buff.data(); } + + std::vector& buffer() { return m_buff; } + std::vector release() { return std::move(m_buff); } + public: - size_t write(const PostScript&); - size_t write(const FileFooter&); - size_t write(const StripeInformation&); - size_t write(const SchemaType&); - size_t write(const UserMetadataItem&); - size_t write(const StripeFooter&); - size_t write(const Stream&); - size_t write(const ColumnEncoding&); - size_t write(const StripeStatistics&); - size_t write(const Metadata&); + size_t write(PostScript const&); + size_t write(FileFooter const&); + size_t write(StripeInformation const&); + size_t write(SchemaType const&); + size_t write(UserMetadataItem const&); + size_t write(StripeFooter const&); + size_t write(Stream const&); + size_t write(ColumnEncoding const&); + size_t write(StripeStatistics const&); + size_t write(Metadata const&); protected: - std::vector* m_buf; + std::vector m_buff; struct ProtobufFieldWriter; }; @@ -613,7 +624,7 @@ struct column_validity_info { * convenience methods for initializing and accessing metadata. */ class metadata { - using OrcStripeInfo = std::pair; + using OrcStripeInfo = std::pair; public: struct stripe_source_mapping { diff --git a/cpp/src/io/orc/orc_field_writer.hpp b/cpp/src/io/orc/orc_field_writer.hpp index 44d87190844..fdba0d81a32 100644 --- a/cpp/src/io/orc/orc_field_writer.hpp +++ b/cpp/src/io/orc/orc_field_writer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ struct ProtobufWriter::ProtobufFieldWriter { void field_packed_uint(int field, const std::vector& value) { struct_size += p->put_uint(encode_field_number>(field)); - auto lpos = p->m_buf->size(); + auto lpos = p->m_buff.size(); p->put_byte(0); auto sz = std::accumulate(value.begin(), value.end(), 0, [p = this->p](size_t sum, auto val) { return sum + p->put_uint(val); @@ -62,8 +62,8 @@ struct ProtobufWriter::ProtobufFieldWriter { struct_size += sz + 1; for (; sz > 0x7f; sz >>= 7, struct_size++) - p->m_buf->insert(p->m_buf->begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); - (*(p->m_buf))[lpos] = static_cast(sz); + p->m_buff.insert(p->m_buff.begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); + (p->m_buff)[lpos] = static_cast(sz); } /** @@ -84,13 +84,13 @@ struct ProtobufWriter::ProtobufFieldWriter { void field_struct(int field, const T& value) { struct_size += p->put_uint(encode_field_number(field, ProtofType::FIXEDLEN)); - auto lpos = p->m_buf->size(); + auto lpos = p->m_buff.size(); p->put_byte(0); auto sz = p->write(value); struct_size += sz + 1; for (; sz > 0x7f; sz >>= 7, struct_size++) - p->m_buf->insert(p->m_buf->begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); - (*(p->m_buf))[lpos] = static_cast(sz); + p->m_buff.insert(p->m_buff.begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); + (p->m_buff)[lpos] = static_cast(sz); } /** diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 8d85b001817..00b5c5428b1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1390,8 +1390,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, host_span comp_res, std::vector const& rg_stats, StripeInformation* stripe, - orc_streams* streams, - ProtobufWriter* pbw) + orc_streams* streams) { row_group_index_info present; row_group_index_info data; @@ -1443,21 +1442,21 @@ void writer::impl::write_index_stream(int32_t stripe_id, } } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; std::for_each(rowgroups_range.cbegin(), rowgroups_range.cend(), [&](auto rowgroup) { - pbw->put_row_index_entry(present.comp_pos, - present.pos, - data.comp_pos, - data.pos, - data2.comp_pos, - data2.pos, - kind, - (rg_stats.empty() or stream_id == 0) - ? nullptr - : (&rg_stats[column_id * segmentation.num_rowgroups() + rowgroup])); + pbw.put_row_index_entry(present.comp_pos, + present.pos, + data.comp_pos, + data.pos, + data2.comp_pos, + data2.pos, + kind, + (rg_stats.empty() or stream_id == 0) + ? nullptr + : (&rg_stats[column_id * segmentation.num_rowgroups() + rowgroup])); if (stream_id != 0) { const auto& strm = enc_streams[column_id][rowgroup]; @@ -1467,15 +1466,15 @@ void writer::impl::write_index_stream(int32_t stripe_id, } }); - (*streams)[stream_id].length = buffer_.size(); + (*streams)[stream_id].length = pbw.size(); if (compression_kind_ != NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; - buffer_[0] = static_cast(uncomp_ix_len >> 0); - buffer_[1] = static_cast(uncomp_ix_len >> 8); - buffer_[2] = static_cast(uncomp_ix_len >> 16); + pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); + pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); + pbw.buffer()[2] = static_cast(uncomp_ix_len >> 16); } - out_sink_->host_write(buffer_.data(), buffer_.size()); - stripe->indexLength += buffer_.size(); + out_sink_->host_write(pbw.data(), pbw.size()); + stripe->indexLength += pbw.size(); } std::future writer::impl::write_data_stream(gpu::StripeStream const& strm_desc, @@ -2254,8 +2253,6 @@ void writer::impl::write(table_view const& table) comp_results.device_to_host(stream, true); } - ProtobufWriter pbw_(&buffer_); - auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation); if (intermediate_stats.stripe_stat_chunks.size() > 0) { @@ -2281,8 +2278,7 @@ void writer::impl::write(table_view const& table) comp_results, intermediate_stats.rowgroup_blobs, &stripe, - &streams, - &pbw_); + &streams); } // Column data consisting one or more separate streams @@ -2309,16 +2305,16 @@ void writer::impl::write(table_view const& table) : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(sf); - stripe.footerLength = buffer_.size(); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); + pbw.write(sf); + stripe.footerLength = pbw.size(); if (compression_kind_ != NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; - buffer_[0] = static_cast(uncomp_sf_len >> 0); - buffer_[1] = static_cast(uncomp_sf_len >> 8); - buffer_[2] = static_cast(uncomp_sf_len >> 16); + pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); + pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); + pbw.buffer()[2] = static_cast(uncomp_sf_len >> 16); } - out_sink_->host_write(buffer_.data(), buffer_.size()); + out_sink_->host_write(pbw.data(), pbw.size()); } for (auto const& task : write_tasks) { task.wait(); @@ -2376,19 +2372,18 @@ void writer::impl::close() { if (closed) { return; } closed = true; - ProtobufWriter pbw_(&buffer_); PostScript ps; auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics); // File-level statistics if (not statistics.file_level.empty()) { - buffer_.resize(0); - pbw_.put_uint(encode_field_number(1)); - pbw_.put_uint(persisted_stripe_statistics.num_rows); + ProtobufWriter pbw; + pbw.put_uint(encode_field_number(1)); + pbw.put_uint(persisted_stripe_statistics.num_rows); // First entry contains total number of rows ff.statistics.reserve(ff.types.size()); - ff.statistics.emplace_back(std::move(buffer_)); + ff.statistics.emplace_back(pbw.release()); // Add file stats, stored after stripe stats in `column_stats` ff.statistics.insert(ff.statistics.end(), std::make_move_iterator(statistics.file_level.begin()), @@ -2400,10 +2395,10 @@ void writer::impl::close() md.stripeStats.resize(ff.stripes.size()); for (size_t stripe_id = 0; stripe_id < ff.stripes.size(); stripe_id++) { md.stripeStats[stripe_id].colStats.resize(ff.types.size()); - buffer_.resize(0); - pbw_.put_uint(encode_field_number(1)); - pbw_.put_uint(ff.stripes[stripe_id].numberOfRows); - md.stripeStats[stripe_id].colStats[0] = std::move(buffer_); + ProtobufWriter pbw; + pbw.put_uint(encode_field_number(1)); + pbw.put_uint(ff.stripes[stripe_id].numberOfRows); + md.stripeStats[stripe_id].colStats[0] = pbw.release(); for (size_t col_idx = 0; col_idx < ff.types.size() - 1; col_idx++) { size_t idx = ff.stripes.size() * col_idx + stripe_id; md.stripeStats[stripe_id].colStats[1 + col_idx] = std::move(statistics.stripe_level[idx]); @@ -2421,27 +2416,28 @@ void writer::impl::close() // Write statistics metadata if (md.stripeStats.size() != 0) { - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(md); - add_uncompressed_block_headers(buffer_); - ps.metadataLength = buffer_.size(); - out_sink_->host_write(buffer_.data(), buffer_.size()); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); + pbw.write(md); + add_uncompressed_block_headers(pbw.buffer()); + ps.metadataLength = pbw.size(); + out_sink_->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(ff); - add_uncompressed_block_headers(buffer_); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); + pbw.write(ff); + add_uncompressed_block_headers(pbw.buffer()); // Write postscript metadata - ps.footerLength = buffer_.size(); + ps.footerLength = pbw.size(); ps.compression = compression_kind_; ps.compressionBlockSize = compression_blocksize_; ps.version = {0, 12}; ps.magic = MAGIC; - const auto ps_length = static_cast(pbw_.write(ps)); - buffer_.push_back(ps_length); - out_sink_->host_write(buffer_.data(), buffer_.size()); + + const auto ps_length = static_cast(pbw.write(ps)); + pbw.put_byte(ps_length); + out_sink_->host_write(pbw.data(), pbw.size()); out_sink_->flush(); } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index dc8aad33af0..691fba6bac2 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -382,7 +382,6 @@ class writer::impl { * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in,out] pbw Protobuf writer */ void write_index_stream(int32_t stripe_id, int32_t stream_id, @@ -393,8 +392,7 @@ class writer::impl { host_span comp_out, std::vector const& rg_stats, StripeInformation* stripe, - orc_streams* streams, - ProtobufWriter* pbw); + orc_streams* streams); /** * @brief Write the specified column's data streams @@ -451,7 +449,6 @@ class writer::impl { // statistics data saved between calls to write before a close writes out the statistics persisted_statistics persisted_stripe_statistics; - std::vector buffer_; std::unique_ptr out_sink_; };