From 9422fd5f225ab9eb9c42c49416a9c10370cb26b3 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 1 Mar 2023 11:22:02 -0800 Subject: [PATCH 01/10] Move `const` position Signed-off-by: Nghia Truong --- cpp/src/io/orc/orc.cpp | 30 ++++++++++---------- cpp/src/io/orc/orc.hpp | 62 +++++++++++++++++++++--------------------- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 880990c552f..f8c094c0824 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -28,7 +28,7 @@ namespace cudf { namespace io { namespace orc { -uint32_t ProtobufReader::read_field_size(const uint8_t* end) +uint32_t ProtobufReader::read_field_size(uint8_t const* end) { auto const size = get(); CUDF_EXPECTS(size <= static_cast(end - m_cur), "Protobuf parsing out of bounds"); @@ -268,7 +268,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, } } -size_t ProtobufWriter::write(const PostScript& s) +size_t ProtobufWriter::write(PostScript const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.footerLength); @@ -280,7 +280,7 @@ size_t ProtobufWriter::write(const PostScript& s) return w.value(); } -size_t ProtobufWriter::write(const FileFooter& s) +size_t ProtobufWriter::write(FileFooter const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.headerLength); @@ -294,7 +294,7 @@ size_t ProtobufWriter::write(const FileFooter& s) return w.value(); } -size_t ProtobufWriter::write(const StripeInformation& s) +size_t ProtobufWriter::write(StripeInformation const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.offset); @@ -305,7 +305,7 @@ size_t ProtobufWriter::write(const StripeInformation& s) return w.value(); } -size_t ProtobufWriter::write(const SchemaType& s) +size_t ProtobufWriter::write(SchemaType const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.kind); @@ -317,7 +317,7 @@ size_t ProtobufWriter::write(const SchemaType& s) return w.value(); } -size_t ProtobufWriter::write(const UserMetadataItem& s) +size_t ProtobufWriter::write(UserMetadataItem const& s) { ProtobufFieldWriter w(this); w.field_blob(1, s.name); @@ -325,7 +325,7 @@ size_t ProtobufWriter::write(const UserMetadataItem& s) return w.value(); } -size_t ProtobufWriter::write(const StripeFooter& s) +size_t ProtobufWriter::write(StripeFooter const& s) { ProtobufFieldWriter w(this); w.field_repeated_struct(1, s.streams); @@ -334,7 +334,7 @@ size_t ProtobufWriter::write(const StripeFooter& s) return w.value(); } -size_t ProtobufWriter::write(const Stream& s) +size_t ProtobufWriter::write(Stream const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.kind); @@ -343,7 +343,7 @@ size_t ProtobufWriter::write(const Stream& s) return w.value(); } -size_t ProtobufWriter::write(const ColumnEncoding& s) +size_t ProtobufWriter::write(ColumnEncoding const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.kind); @@ -351,14 +351,14 @@ size_t ProtobufWriter::write(const ColumnEncoding& s) return w.value(); } -size_t ProtobufWriter::write(const StripeStatistics& s) +size_t ProtobufWriter::write(StripeStatistics const& s) { ProtobufFieldWriter w(this); w.field_repeated_struct_blob(1, s.colStats); return w.value(); } -size_t ProtobufWriter::write(const Metadata& s) +size_t ProtobufWriter::write(Metadata const& s) { ProtobufFieldWriter w(this); w.field_repeated_struct(1, s.stripeStats); @@ -443,13 +443,13 @@ host_span OrcDecompressor::decompress_blocks(host_spansize(); - const auto max_ps_size = std::min(len, static_cast(256)); + auto const len = source->size(); + auto const max_ps_size = std::min(len, static_cast(256)); // Read uncompressed postscript section (max 255 bytes + 1 byte for length) auto buffer = source->host_read(len - max_ps_size, max_ps_size); - const size_t ps_length = buffer->data()[max_ps_size - 1]; - const uint8_t* ps_data = &buffer->data()[max_ps_size - ps_length - 1]; + size_t const ps_length = buffer->data()[max_ps_size - 1]; + uint8_t const* ps_data = &buffer->data()[max_ps_size - ps_length - 1]; ProtobufReader(ps_data, ps_length).read(ps); CUDF_EXPECTS(ps.footerLength + ps_length < len, "Invalid footer length"); diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 44882b71925..da0711700ed 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -196,7 +196,7 @@ int constexpr encode_field_number(int field_number) noexcept */ class ProtobufReader { public: - ProtobufReader(const uint8_t* base, size_t len) : m_base(base), m_cur(base), m_end(base + len) {} + ProtobufReader(uint8_t const* base, size_t len) : m_base(base), m_cur(base), m_end(base + len) {} template void read(T& s) @@ -241,40 +241,40 @@ class ProtobufReader { template void function_builder(T& s, size_t maxlen, std::tuple& op); - uint32_t read_field_size(const uint8_t* end); + uint32_t read_field_size(uint8_t const* end); template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { value = get(); } template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { value = static_cast(get()); } template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); - value.assign(reinterpret_cast(m_cur), size); + value.assign(reinterpret_cast(m_cur), size); m_cur += size; } template >>* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); - value.emplace_back(reinterpret_cast(m_cur), size); + value.emplace_back(reinterpret_cast(m_cur), size); m_cur += size; } template > and !std::is_same_v>* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); value.emplace_back(); @@ -283,7 +283,7 @@ class ProtobufReader { template >>* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { typename T::value_type contained_value; read_field(contained_value, end); @@ -291,21 +291,21 @@ class ProtobufReader { } template - auto read_field(T& value, const uint8_t* end) -> decltype(read(value, 0)) + auto read_field(T& value, uint8_t const* end) -> decltype(read(value, 0)) { auto const size = read_field_size(end); read(value, size); } template >* = nullptr> - void read_field(T& value, const uint8_t* end) + void read_field(T& value, uint8_t const* end) { memcpy(&value, m_cur, sizeof(T)); m_cur += sizeof(T); } template - void read_packed_field(T& value, const uint8_t* end) + void read_packed_field(T& value, uint8_t const* end) { auto const len = get(); auto const field_end = std::min(m_cur + len, end); @@ -314,7 +314,7 @@ class ProtobufReader { } template - void read_raw_field(T& value, const uint8_t* end) + void read_raw_field(T& value, uint8_t const* end) { auto const size = read_field_size(end); value.emplace_back(m_cur, m_cur + size); @@ -331,7 +331,7 @@ class ProtobufReader { { } - inline void operator()(ProtobufReader* pbr, const uint8_t* end) + inline void operator()(ProtobufReader* pbr, uint8_t const* end) { pbr->read_field(output_value, end); } @@ -347,7 +347,7 @@ class ProtobufReader { { } - inline void operator()(ProtobufReader* pbr, const uint8_t* end) + inline void operator()(ProtobufReader* pbr, uint8_t const* end) { pbr->read_packed_field(output_value, end); } @@ -363,15 +363,15 @@ class ProtobufReader { { } - inline void operator()(ProtobufReader* pbr, const uint8_t* end) + inline void operator()(ProtobufReader* pbr, uint8_t const* end) { pbr->read_raw_field(output_value, end); } }; - const uint8_t* const m_base; - const uint8_t* m_cur; - const uint8_t* const m_end; + uint8_t const* const m_base; + uint8_t const* m_cur; + uint8_t const* const m_end; public: /** @@ -529,16 +529,16 @@ class ProtobufWriter { ColStatsBlob const* stats); public: - size_t write(const PostScript&); - size_t write(const FileFooter&); - size_t write(const StripeInformation&); - size_t write(const SchemaType&); - size_t write(const UserMetadataItem&); - size_t write(const StripeFooter&); - size_t write(const Stream&); - size_t write(const ColumnEncoding&); - size_t write(const StripeStatistics&); - size_t write(const Metadata&); + size_t write(PostScript const&); + size_t write(FileFooter const&); + size_t write(StripeInformation const&); + size_t write(SchemaType const&); + size_t write(UserMetadataItem const&); + size_t write(StripeFooter const&); + size_t write(Stream const&); + size_t write(ColumnEncoding const&); + size_t write(StripeStatistics const&); + size_t write(Metadata const&); protected: std::vector* m_buf; @@ -613,7 +613,7 @@ struct column_validity_info { * convenience methods for initializing and accessing metadata. */ class metadata { - using OrcStripeInfo = std::pair; + using OrcStripeInfo = std::pair; public: struct stripe_source_mapping { From 33d541e6f6476f4584c34b010e103f032ff82ded Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 1 Mar 2023 21:08:54 -0800 Subject: [PATCH 02/10] Get rid of the internal `buffer_` state Signed-off-by: Nghia Truong --- cpp/src/io/orc/orc.cpp | 12 +++-- cpp/src/io/orc/orc.hpp | 14 +++-- cpp/src/io/orc/writer_impl.cu | 96 ++++++++++++++++++---------------- cpp/src/io/orc/writer_impl.hpp | 5 +- 4 files changed, 70 insertions(+), 57 deletions(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index f8c094c0824..360dff20136 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -213,8 +213,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, TypeKind kind, ColStatsBlob const* stats) { - std::vector positions_data; - ProtobufWriter position_writer(&positions_data); + ProtobufWriter position_writer; auto const positions_size_offset = position_writer.put_uint( encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true] position_writer.put_byte(0xcd); // positions size placeholder @@ -246,19 +245,22 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, positions_size += position_writer.put_byte(0); } } + + auto positions_data = position_writer.release(); + // size of the field 1 - positions_data[positions_size_offset] = static_cast(positions_size); + (*positions_data)[positions_size_offset] = static_cast(positions_size); auto const stats_size = (stats == nullptr) ? 0 : varint_size(encode_field_number(2)) + varint_size(stats->size()) + stats->size(); - auto const entry_size = positions_data.size() + stats_size; + auto const entry_size = positions_data->size() + stats_size; // 1:RowIndex.entry put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); put_uint(entry_size); - put_bytes(positions_data); + put_bytes(*positions_data); if (stats != nullptr) { put_uint(encode_field_number(2)); // 2: statistics diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index da0711700ed..af4978e7464 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -477,13 +477,15 @@ inline int64_t ProtobufReader::get() */ class ProtobufWriter { public: - ProtobufWriter() { m_buf = nullptr; } - ProtobufWriter(std::vector* output) { m_buf = output; } + ProtobufWriter() : m_buf{std::make_unique>()} {} + ProtobufWriter(std::unique_ptr>&& buff) : m_buf{std::move(buff)} {} + uint32_t put_byte(uint8_t v) { m_buf->push_back(v); return 1; } + template uint32_t put_bytes(host_span values) { @@ -492,6 +494,7 @@ class ProtobufWriter { m_buf->insert(m_buf->end(), values.begin(), values.end()); return values.size(); } + uint32_t put_uint(uint64_t v) { int l = 1; @@ -519,6 +522,7 @@ class ProtobufWriter { int64_t s = (v < 0); return put_uint(((v ^ -s) << 1) + s); } + void put_row_index_entry(int32_t present_blk, int32_t present_ofs, int32_t data_blk, @@ -528,6 +532,10 @@ class ProtobufWriter { TypeKind kind, ColStatsBlob const* stats); + void resize(std::size_t bytes) { m_buf->resize(bytes); } + + std::unique_ptr> release() { return std::move(m_buf); } + public: size_t write(PostScript const&); size_t write(FileFooter const&); @@ -541,7 +549,7 @@ class ProtobufWriter { size_t write(Metadata const&); protected: - std::vector* m_buf; + std::unique_ptr> m_buf; struct ProtobufFieldWriter; }; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index a6effeefc6c..0acfaa0c033 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1390,8 +1390,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, host_span comp_res, std::vector const& rg_stats, StripeInformation* stripe, - orc_streams* streams, - ProtobufWriter* pbw) + orc_streams* streams) { row_group_index_info present; row_group_index_info data; @@ -1443,21 +1442,22 @@ void writer::impl::write_index_stream(int32_t stripe_id, } } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw; + pbw.resize((compression_kind_ != NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; std::for_each(rowgroups_range.cbegin(), rowgroups_range.cend(), [&](auto rowgroup) { - pbw->put_row_index_entry(present.comp_pos, - present.pos, - data.comp_pos, - data.pos, - data2.comp_pos, - data2.pos, - kind, - (rg_stats.empty() or stream_id == 0) - ? nullptr - : (&rg_stats[column_id * segmentation.num_rowgroups() + rowgroup])); + pbw.put_row_index_entry(present.comp_pos, + present.pos, + data.comp_pos, + data.pos, + data2.comp_pos, + data2.pos, + kind, + (rg_stats.empty() or stream_id == 0) + ? nullptr + : (&rg_stats[column_id * segmentation.num_rowgroups() + rowgroup])); if (stream_id != 0) { const auto& strm = enc_streams[column_id][rowgroup]; @@ -1467,15 +1467,16 @@ void writer::impl::write_index_stream(int32_t stripe_id, } }); - (*streams)[stream_id].length = buffer_.size(); + auto const buff = pbw.release(); + (*streams)[stream_id].length = buff->size(); if (compression_kind_ != NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; - buffer_[0] = static_cast(uncomp_ix_len >> 0); - buffer_[1] = static_cast(uncomp_ix_len >> 8); - buffer_[2] = static_cast(uncomp_ix_len >> 16); + (*buff)[0] = static_cast(uncomp_ix_len >> 0); + (*buff)[1] = static_cast(uncomp_ix_len >> 8); + (*buff)[2] = static_cast(uncomp_ix_len >> 16); } - out_sink_->host_write(buffer_.data(), buffer_.size()); - stripe->indexLength += buffer_.size(); + out_sink_->host_write(buff->data(), buff->size()); + stripe->indexLength += buff->size(); } std::future writer::impl::write_data_stream(gpu::StripeStream const& strm_desc, @@ -2250,8 +2251,6 @@ void writer::impl::write(table_view const& table) comp_results.device_to_host(stream, true); } - ProtobufWriter pbw_(&buffer_); - auto intermediate_stats = gather_statistic_blobs(stats_freq_, orc_table, segmentation); if (intermediate_stats.stripe_stat_chunks.size() > 0) { @@ -2277,8 +2276,7 @@ void writer::impl::write(table_view const& table) comp_results, intermediate_stats.rowgroup_blobs, &stripe, - &streams, - &pbw_); + &streams); } // Column data consisting one or more separate streams @@ -2305,16 +2303,18 @@ void writer::impl::write(table_view const& table) : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(sf); - stripe.footerLength = buffer_.size(); + ProtobufWriter pbw; + pbw.resize((compression_kind_ != NONE) ? 3 : 0); + pbw.write(sf); + auto const buff = pbw.release(); + stripe.footerLength = buff->size(); if (compression_kind_ != NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; - buffer_[0] = static_cast(uncomp_sf_len >> 0); - buffer_[1] = static_cast(uncomp_sf_len >> 8); - buffer_[2] = static_cast(uncomp_sf_len >> 16); + (*buff)[0] = static_cast(uncomp_sf_len >> 0); + (*buff)[1] = static_cast(uncomp_sf_len >> 8); + (*buff)[2] = static_cast(uncomp_sf_len >> 16); } - out_sink_->host_write(buffer_.data(), buffer_.size()); + out_sink_->host_write(buff->data(), buff->size()); } for (auto const& task : write_tasks) { task.wait(); @@ -2372,19 +2372,18 @@ void writer::impl::close() { if (closed) { return; } closed = true; - ProtobufWriter pbw_(&buffer_); PostScript ps; auto const statistics = finish_statistic_blobs(ff.stripes.size(), persisted_stripe_statistics); // File-level statistics if (not statistics.file_level.empty()) { - buffer_.resize(0); + ProtobufWriter pbw_; pbw_.put_uint(encode_field_number(1)); pbw_.put_uint(persisted_stripe_statistics.num_rows); // First entry contains total number of rows ff.statistics.reserve(ff.types.size()); - ff.statistics.emplace_back(std::move(buffer_)); + ff.statistics.emplace_back(std::move(*pbw_.release())); // Add file stats, stored after stripe stats in `column_stats` ff.statistics.insert(ff.statistics.end(), std::make_move_iterator(statistics.file_level.begin()), @@ -2396,10 +2395,10 @@ void writer::impl::close() md.stripeStats.resize(ff.stripes.size()); for (size_t stripe_id = 0; stripe_id < ff.stripes.size(); stripe_id++) { md.stripeStats[stripe_id].colStats.resize(ff.types.size()); - buffer_.resize(0); + ProtobufWriter pbw_; pbw_.put_uint(encode_field_number(1)); pbw_.put_uint(ff.stripes[stripe_id].numberOfRows); - md.stripeStats[stripe_id].colStats[0] = std::move(buffer_); + md.stripeStats[stripe_id].colStats[0] = std::move(*pbw_.release()); for (size_t col_idx = 0; col_idx < ff.types.size() - 1; col_idx++) { size_t idx = ff.stripes.size() * col_idx + stripe_id; md.stripeStats[stripe_id].colStats[1 + col_idx] = std::move(statistics.stripe_level[idx]); @@ -2417,27 +2416,34 @@ void writer::impl::close() // Write statistics metadata if (md.stripeStats.size() != 0) { - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw_; + pbw_.resize((compression_kind_ != NONE) ? 3 : 0); pbw_.write(md); - add_uncompressed_block_headers(buffer_); - ps.metadataLength = buffer_.size(); - out_sink_->host_write(buffer_.data(), buffer_.size()); + auto const buff = pbw_.release(); + add_uncompressed_block_headers(*buff); + ps.metadataLength = buff->size(); + out_sink_->host_write(buff->data(), buff->size()); } else { ps.metadataLength = 0; } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw_; + pbw_.resize((compression_kind_ != NONE) ? 3 : 0); pbw_.write(ff); - add_uncompressed_block_headers(buffer_); + auto buff = pbw_.release(); + add_uncompressed_block_headers(*buff); // todo // Write postscript metadata - ps.footerLength = buffer_.size(); + ps.footerLength = buff->size(); ps.compression = compression_kind_; ps.compressionBlockSize = compression_blocksize_; ps.version = {0, 12}; ps.magic = MAGIC; - const auto ps_length = static_cast(pbw_.write(ps)); - buffer_.push_back(ps_length); - out_sink_->host_write(buffer_.data(), buffer_.size()); + + pbw_ = ProtobufWriter{std::move(buff)}; + const auto ps_length = static_cast(pbw_.write(ps)); + pbw_.put_byte(ps_length); + buff = pbw_.release(); + out_sink_->host_write(buff->data(), buff->size()); out_sink_->flush(); } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index dc8aad33af0..580dbfdb9e5 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -382,7 +382,6 @@ class writer::impl { * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams - * @param[in,out] pbw Protobuf writer */ void write_index_stream(int32_t stripe_id, int32_t stream_id, @@ -393,8 +392,7 @@ class writer::impl { host_span comp_out, std::vector const& rg_stats, StripeInformation* stripe, - orc_streams* streams, - ProtobufWriter* pbw); + orc_streams* streams); /** * @brief Write the specified column's data streams @@ -451,7 +449,6 @@ class writer::impl { // statistics data saved between calls to write before a close writes out the statistics persisted_statistics persisted_stripe_statistics; - std::vector buffer_; std::unique_ptr out_sink_; }; From aacbbd52ddf810b72aa584bde510cc089d219989 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 1 Mar 2023 21:20:01 -0800 Subject: [PATCH 03/10] Rename variable Signed-off-by: Nghia Truong --- cpp/src/io/orc/writer_impl.cu | 40 +++++++++++++++++------------------ 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 0acfaa0c033..179ce9dfceb 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2378,12 +2378,12 @@ void writer::impl::close() // File-level statistics if (not statistics.file_level.empty()) { - ProtobufWriter pbw_; - pbw_.put_uint(encode_field_number(1)); - pbw_.put_uint(persisted_stripe_statistics.num_rows); + ProtobufWriter pbw; + pbw.put_uint(encode_field_number(1)); + pbw.put_uint(persisted_stripe_statistics.num_rows); // First entry contains total number of rows ff.statistics.reserve(ff.types.size()); - ff.statistics.emplace_back(std::move(*pbw_.release())); + ff.statistics.emplace_back(std::move(*pbw.release())); // Add file stats, stored after stripe stats in `column_stats` ff.statistics.insert(ff.statistics.end(), std::make_move_iterator(statistics.file_level.begin()), @@ -2395,10 +2395,10 @@ void writer::impl::close() md.stripeStats.resize(ff.stripes.size()); for (size_t stripe_id = 0; stripe_id < ff.stripes.size(); stripe_id++) { md.stripeStats[stripe_id].colStats.resize(ff.types.size()); - ProtobufWriter pbw_; - pbw_.put_uint(encode_field_number(1)); - pbw_.put_uint(ff.stripes[stripe_id].numberOfRows); - md.stripeStats[stripe_id].colStats[0] = std::move(*pbw_.release()); + ProtobufWriter pbw; + pbw.put_uint(encode_field_number(1)); + pbw.put_uint(ff.stripes[stripe_id].numberOfRows); + md.stripeStats[stripe_id].colStats[0] = std::move(*pbw.release()); for (size_t col_idx = 0; col_idx < ff.types.size() - 1; col_idx++) { size_t idx = ff.stripes.size() * col_idx + stripe_id; md.stripeStats[stripe_id].colStats[1 + col_idx] = std::move(statistics.stripe_level[idx]); @@ -2416,20 +2416,20 @@ void writer::impl::close() // Write statistics metadata if (md.stripeStats.size() != 0) { - ProtobufWriter pbw_; - pbw_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(md); - auto const buff = pbw_.release(); + ProtobufWriter pbw; + pbw.resize((compression_kind_ != NONE) ? 3 : 0); + pbw.write(md); + auto const buff = pbw.release(); add_uncompressed_block_headers(*buff); ps.metadataLength = buff->size(); out_sink_->host_write(buff->data(), buff->size()); } else { ps.metadataLength = 0; } - ProtobufWriter pbw_; - pbw_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(ff); - auto buff = pbw_.release(); + ProtobufWriter pbw; + pbw.resize((compression_kind_ != NONE) ? 3 : 0); + pbw.write(ff); + auto buff = pbw.release(); add_uncompressed_block_headers(*buff); // todo // Write postscript metadata @@ -2439,10 +2439,10 @@ void writer::impl::close() ps.version = {0, 12}; ps.magic = MAGIC; - pbw_ = ProtobufWriter{std::move(buff)}; - const auto ps_length = static_cast(pbw_.write(ps)); - pbw_.put_byte(ps_length); - buff = pbw_.release(); + pbw = ProtobufWriter{std::move(buff)}; + const auto ps_length = static_cast(pbw.write(ps)); + pbw.put_byte(ps_length); + buff = pbw.release(); out_sink_->host_write(buff->data(), buff->size()); out_sink_->flush(); } From 7380cb98f2af6487b119d7dc5e5a02c0bc7f0e45 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 1 Mar 2023 21:20:08 -0800 Subject: [PATCH 04/10] Update copyright year Signed-off-by: Nghia Truong --- cpp/src/io/orc/orc.hpp | 2 +- cpp/src/io/orc/writer_impl.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index af4978e7464..d79c41f9316 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 580dbfdb9e5..691fba6bac2 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 465b569fcb5e070a6d9925e92773112f27a7afaf Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 3 Mar 2023 11:20:11 -0800 Subject: [PATCH 05/10] Write data with bound check Signed-off-by: Nghia Truong --- cpp/src/io/orc/orc.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 360dff20136..ad390d8b976 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -249,7 +249,7 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, auto positions_data = position_writer.release(); // size of the field 1 - (*positions_data)[positions_size_offset] = static_cast(positions_size); + positions_data->at(positions_size_offset) = static_cast(positions_size); auto const stats_size = (stats == nullptr) ? 0 From c827d68cc5e1efe5dd5b02da2a3e0363da58e839 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 3 Mar 2023 11:20:17 -0800 Subject: [PATCH 06/10] Misc Signed-off-by: Nghia Truong --- cpp/src/io/orc/writer_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 179ce9dfceb..55c3b2784b1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2430,7 +2430,7 @@ void writer::impl::close() pbw.resize((compression_kind_ != NONE) ? 3 : 0); pbw.write(ff); auto buff = pbw.release(); - add_uncompressed_block_headers(*buff); // todo + add_uncompressed_block_headers(*buff); // Write postscript metadata ps.footerLength = buff->size(); From 7dda1e90d941bebba271a2fee0afe1f79b1bb424 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 10 Mar 2023 15:34:42 -0800 Subject: [PATCH 07/10] Simplify buffer implementation --- cpp/src/io/orc/orc.cpp | 8 +++---- cpp/src/io/orc/orc.hpp | 5 ++++- cpp/src/io/orc/writer_impl.cu | 40 +++++++++++++++-------------------- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index ad390d8b976..5445e59297c 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -246,21 +246,19 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, } } - auto positions_data = position_writer.release(); - // size of the field 1 - positions_data->at(positions_size_offset) = static_cast(positions_size); + position_writer.buffer()[positions_size_offset] = static_cast(positions_size); auto const stats_size = (stats == nullptr) ? 0 : varint_size(encode_field_number(2)) + varint_size(stats->size()) + stats->size(); - auto const entry_size = positions_data->size() + stats_size; + auto const entry_size = position_writer.size() + stats_size; // 1:RowIndex.entry put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); put_uint(entry_size); - put_bytes(*positions_data); + put_bytes(position_writer.buffer()); if (stats != nullptr) { put_uint(encode_field_number(2)); // 2: statistics diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index d79c41f9316..c3052e680b6 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -478,7 +478,6 @@ inline int64_t ProtobufReader::get() class ProtobufWriter { public: ProtobufWriter() : m_buf{std::make_unique>()} {} - ProtobufWriter(std::unique_ptr>&& buff) : m_buf{std::move(buff)} {} uint32_t put_byte(uint8_t v) { @@ -534,6 +533,10 @@ class ProtobufWriter { void resize(std::size_t bytes) { m_buf->resize(bytes); } + std::size_t size() const { return m_buf->size(); } + uint8_t const* data() { return m_buf->data(); } + + std::vector& buffer() { return *m_buf; } std::unique_ptr> release() { return std::move(m_buf); } public: diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index d7d1157feaa..e7dc21740ac 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1467,16 +1467,15 @@ void writer::impl::write_index_stream(int32_t stripe_id, } }); - auto const buff = pbw.release(); - (*streams)[stream_id].length = buff->size(); + (*streams)[stream_id].length = pbw.size(); if (compression_kind_ != NONE) { uint32_t uncomp_ix_len = (uint32_t)((*streams)[stream_id].length - 3) * 2 + 1; - (*buff)[0] = static_cast(uncomp_ix_len >> 0); - (*buff)[1] = static_cast(uncomp_ix_len >> 8); - (*buff)[2] = static_cast(uncomp_ix_len >> 16); + pbw.buffer()[0] = static_cast(uncomp_ix_len >> 0); + pbw.buffer()[1] = static_cast(uncomp_ix_len >> 8); + pbw.buffer()[2] = static_cast(uncomp_ix_len >> 16); } - out_sink_->host_write(buff->data(), buff->size()); - stripe->indexLength += buff->size(); + out_sink_->host_write(pbw.data(), pbw.size()); + stripe->indexLength += pbw.size(); } std::future writer::impl::write_data_stream(gpu::StripeStream const& strm_desc, @@ -2310,15 +2309,14 @@ void writer::impl::write(table_view const& table) ProtobufWriter pbw; pbw.resize((compression_kind_ != NONE) ? 3 : 0); pbw.write(sf); - auto const buff = pbw.release(); - stripe.footerLength = buff->size(); + stripe.footerLength = pbw.size(); if (compression_kind_ != NONE) { uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; - (*buff)[0] = static_cast(uncomp_sf_len >> 0); - (*buff)[1] = static_cast(uncomp_sf_len >> 8); - (*buff)[2] = static_cast(uncomp_sf_len >> 16); + pbw.buffer()[0] = static_cast(uncomp_sf_len >> 0); + pbw.buffer()[1] = static_cast(uncomp_sf_len >> 8); + pbw.buffer()[2] = static_cast(uncomp_sf_len >> 16); } - out_sink_->host_write(buff->data(), buff->size()); + out_sink_->host_write(pbw.data(), pbw.size()); } for (auto const& task : write_tasks) { task.wait(); @@ -2423,31 +2421,27 @@ void writer::impl::close() ProtobufWriter pbw; pbw.resize((compression_kind_ != NONE) ? 3 : 0); pbw.write(md); - auto const buff = pbw.release(); - add_uncompressed_block_headers(*buff); - ps.metadataLength = buff->size(); - out_sink_->host_write(buff->data(), buff->size()); + add_uncompressed_block_headers(pbw.buffer()); + ps.metadataLength = pbw.size(); + out_sink_->host_write(pbw.data(), pbw.size()); } else { ps.metadataLength = 0; } ProtobufWriter pbw; pbw.resize((compression_kind_ != NONE) ? 3 : 0); pbw.write(ff); - auto buff = pbw.release(); - add_uncompressed_block_headers(*buff); + add_uncompressed_block_headers(pbw.buffer()); // Write postscript metadata - ps.footerLength = buff->size(); + ps.footerLength = pbw.size(); ps.compression = compression_kind_; ps.compressionBlockSize = compression_blocksize_; ps.version = {0, 12}; ps.magic = MAGIC; - pbw = ProtobufWriter{std::move(buff)}; const auto ps_length = static_cast(pbw.write(ps)); pbw.put_byte(ps_length); - buff = pbw.release(); - out_sink_->host_write(buff->data(), buff->size()); + out_sink_->host_write(pbw.data(), pbw.size()); out_sink_->flush(); } From 8e3f6ae62eae79ba149a6ce86943591d6585914c Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 10 Mar 2023 16:06:40 -0800 Subject: [PATCH 08/10] No longer use `std::unique_ptr` --- cpp/src/io/orc/orc.hpp | 20 ++++++++++---------- cpp/src/io/orc/orc_field_writer.hpp | 12 ++++++------ cpp/src/io/orc/writer_impl.cu | 4 ++-- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index c3052e680b6..369b1e4f622 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -477,11 +477,11 @@ inline int64_t ProtobufReader::get() */ class ProtobufWriter { public: - ProtobufWriter() : m_buf{std::make_unique>()} {} + ProtobufWriter() = default; uint32_t put_byte(uint8_t v) { - m_buf->push_back(v); + m_buff.push_back(v); return 1; } @@ -489,8 +489,8 @@ class ProtobufWriter { uint32_t put_bytes(host_span values) { static_assert(sizeof(T) == 1); - m_buf->reserve(m_buf->size() + values.size()); - m_buf->insert(m_buf->end(), values.begin(), values.end()); + m_buff.reserve(m_buff.size() + values.size()); + m_buff.insert(m_buff.end(), values.begin(), values.end()); return values.size(); } @@ -531,13 +531,13 @@ class ProtobufWriter { TypeKind kind, ColStatsBlob const* stats); - void resize(std::size_t bytes) { m_buf->resize(bytes); } + void resize(std::size_t bytes) { m_buff.resize(bytes); } - std::size_t size() const { return m_buf->size(); } - uint8_t const* data() { return m_buf->data(); } + std::size_t size() const { return m_buff.size(); } + uint8_t const* data() { return m_buff.data(); } - std::vector& buffer() { return *m_buf; } - std::unique_ptr> release() { return std::move(m_buf); } + std::vector& buffer() { return m_buff; } + std::vector release() { return std::move(m_buff); } public: size_t write(PostScript const&); @@ -552,7 +552,7 @@ class ProtobufWriter { size_t write(Metadata const&); protected: - std::unique_ptr> m_buf; + std::vector m_buff; struct ProtobufFieldWriter; }; diff --git a/cpp/src/io/orc/orc_field_writer.hpp b/cpp/src/io/orc/orc_field_writer.hpp index 44d87190844..9c3975cba5e 100644 --- a/cpp/src/io/orc/orc_field_writer.hpp +++ b/cpp/src/io/orc/orc_field_writer.hpp @@ -54,7 +54,7 @@ struct ProtobufWriter::ProtobufFieldWriter { void field_packed_uint(int field, const std::vector& value) { struct_size += p->put_uint(encode_field_number>(field)); - auto lpos = p->m_buf->size(); + auto lpos = p->m_buff.size(); p->put_byte(0); auto sz = std::accumulate(value.begin(), value.end(), 0, [p = this->p](size_t sum, auto val) { return sum + p->put_uint(val); @@ -62,8 +62,8 @@ struct ProtobufWriter::ProtobufFieldWriter { struct_size += sz + 1; for (; sz > 0x7f; sz >>= 7, struct_size++) - p->m_buf->insert(p->m_buf->begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); - (*(p->m_buf))[lpos] = static_cast(sz); + p->m_buff.insert(p->m_buff.begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); + (p->m_buff)[lpos] = static_cast(sz); } /** @@ -84,13 +84,13 @@ struct ProtobufWriter::ProtobufFieldWriter { void field_struct(int field, const T& value) { struct_size += p->put_uint(encode_field_number(field, ProtofType::FIXEDLEN)); - auto lpos = p->m_buf->size(); + auto lpos = p->m_buff.size(); p->put_byte(0); auto sz = p->write(value); struct_size += sz + 1; for (; sz > 0x7f; sz >>= 7, struct_size++) - p->m_buf->insert(p->m_buf->begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); - (*(p->m_buf))[lpos] = static_cast(sz); + p->m_buff.insert(p->m_buff.begin() + (lpos++), static_cast((sz & 0x7f) | 0x80)); + (p->m_buff)[lpos] = static_cast(sz); } /** diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index e7dc21740ac..30852ec9757 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2385,7 +2385,7 @@ void writer::impl::close() pbw.put_uint(persisted_stripe_statistics.num_rows); // First entry contains total number of rows ff.statistics.reserve(ff.types.size()); - ff.statistics.emplace_back(std::move(*pbw.release())); + ff.statistics.emplace_back(pbw.release()); // Add file stats, stored after stripe stats in `column_stats` ff.statistics.insert(ff.statistics.end(), std::make_move_iterator(statistics.file_level.begin()), @@ -2400,7 +2400,7 @@ void writer::impl::close() ProtobufWriter pbw; pbw.put_uint(encode_field_number(1)); pbw.put_uint(ff.stripes[stripe_id].numberOfRows); - md.stripeStats[stripe_id].colStats[0] = std::move(*pbw.release()); + md.stripeStats[stripe_id].colStats[0] = pbw.release(); for (size_t col_idx = 0; col_idx < ff.types.size() - 1; col_idx++) { size_t idx = ff.stripes.size() * col_idx + stripe_id; md.stripeStats[stripe_id].colStats[1 + col_idx] = std::move(statistics.stripe_level[idx]); From 3fadb596aa7f282cb2a4d2f48eb4097f7b300fd9 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 10 Mar 2023 17:38:34 -0800 Subject: [PATCH 09/10] Update copyright year --- cpp/src/io/orc/orc_field_writer.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/orc/orc_field_writer.hpp b/cpp/src/io/orc/orc_field_writer.hpp index 9c3975cba5e..fdba0d81a32 100644 --- a/cpp/src/io/orc/orc_field_writer.hpp +++ b/cpp/src/io/orc/orc_field_writer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From efbd1178a252725d29c4352e130c2bfa7f0bb4b2 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 14 Mar 2023 14:32:55 -0700 Subject: [PATCH 10/10] Vector initialization with size --- cpp/src/io/orc/orc.hpp | 4 ++-- cpp/src/io/orc/writer_impl.cu | 12 ++++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 369b1e4f622..d30c3823080 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -479,6 +479,8 @@ class ProtobufWriter { public: ProtobufWriter() = default; + ProtobufWriter(std::size_t bytes) : m_buff(bytes) {} + uint32_t put_byte(uint8_t v) { m_buff.push_back(v); @@ -531,8 +533,6 @@ class ProtobufWriter { TypeKind kind, ColStatsBlob const* stats); - void resize(std::size_t bytes) { m_buff.resize(bytes); } - std::size_t size() const { return m_buff.size(); } uint8_t const* data() { return m_buff.data(); } diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 30852ec9757..00b5c5428b1 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1442,8 +1442,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, } } - ProtobufWriter pbw; - pbw.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); // Add row index entries auto const& rowgroups_range = segmentation.stripes[stripe_id]; @@ -2306,8 +2305,7 @@ void writer::impl::write(table_view const& table) : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } - ProtobufWriter pbw; - pbw.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); pbw.write(sf); stripe.footerLength = pbw.size(); if (compression_kind_ != NONE) { @@ -2418,8 +2416,7 @@ void writer::impl::close() // Write statistics metadata if (md.stripeStats.size() != 0) { - ProtobufWriter pbw; - pbw.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); pbw.write(md); add_uncompressed_block_headers(pbw.buffer()); ps.metadataLength = pbw.size(); @@ -2427,8 +2424,7 @@ void writer::impl::close() } else { ps.metadataLength = 0; } - ProtobufWriter pbw; - pbw.resize((compression_kind_ != NONE) ? 3 : 0); + ProtobufWriter pbw((compression_kind_ != NONE) ? 3 : 0); pbw.write(ff); add_uncompressed_block_headers(pbw.buffer());