From 42252bfe853cf39fab54f44e85c1c84a3223e131 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 2 May 2022 00:06:26 -0700 Subject: [PATCH 01/17] spans --- cpp/src/io/comp/io_uncomp.h | 5 +---- cpp/src/io/comp/uncomp.cpp | 35 ++++++++++++++--------------------- cpp/src/io/orc/orc.cpp | 2 +- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 7b1feb84813..cfd3d7a0a5c 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -47,10 +47,7 @@ std::vector get_uncompressed_data(host_span data, compression_ class HostDecompressor { public: - virtual size_t Decompress(uint8_t* dstBytes, - size_t dstLen, - uint8_t const* srcBytes, - size_t srcLen) = 0; + virtual size_t decompress(host_span src, host_span dst) = 0; virtual ~HostDecompressor() {} public: diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 66d73074af0..8fa93115d5c 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -443,21 +443,18 @@ std::vector get_uncompressed_data(host_span const data, class HostDecompressor_ZLIB : public HostDecompressor { public: HostDecompressor_ZLIB(bool gz_hdr_) : gz_hdr(gz_hdr_) {} - size_t Decompress(uint8_t* dstBytes, - size_t dstLen, - const uint8_t* srcBytes, - size_t srcLen) override + size_t decompress(host_span src, host_span dst) override { if (gz_hdr) { gz_archive_s gz; - if (!ParseGZArchive(&gz, srcBytes, srcLen)) { return 0; } - srcBytes = gz.comp_data; - srcLen = gz.comp_len; + if (!ParseGZArchive(&gz, src.data(), src.size())) { return 0; } + src = {gz.comp_data, gz.comp_len}; } - if (0 == cpu_inflate(dstBytes, &dstLen, srcBytes, srcLen)) { - return dstLen; + size_t uncomp_size = dst.size(); + if (0 == cpu_inflate(dst.data(), &uncomp_size, src.data(), src.size())) { + return uncomp_size; } else { - return 0; + return 0; // Throw? } } @@ -471,16 +468,12 @@ class HostDecompressor_ZLIB : public HostDecompressor { class HostDecompressor_SNAPPY : public HostDecompressor { public: HostDecompressor_SNAPPY() {} - size_t Decompress(uint8_t* dstBytes, - size_t dstLen, - const uint8_t* srcBytes, - size_t srcLen) override + size_t decompress(host_span src, host_span dst) override { + if (dst.empty() || src.size() < 1) { return 0; } uint32_t uncompressed_size, bytes_left, dst_pos; - const uint8_t* cur = srcBytes; - const uint8_t* end = srcBytes + srcLen; - - if (!dstBytes || srcLen < 1) { return 0; } + auto cur = src.begin(); + auto const end = src.end(); // Read uncompressed length (varint) { uint32_t l = 0, c; @@ -493,7 +486,7 @@ class HostDecompressor_SNAPPY : public HostDecompressor { uncompressed_size |= lo7 << l; l += 7; } while (c > 0x7f && cur < end); - if (!uncompressed_size || uncompressed_size > dstLen || cur >= end) { + if (!uncompressed_size || uncompressed_size > dst.size() || cur >= end) { // Destination buffer too small or zero size return 0; } @@ -528,7 +521,7 @@ class HostDecompressor_SNAPPY : public HostDecompressor { if (offset - 1u >= dst_pos || blen > bytes_left) break; bytes_left -= blen; do { - dstBytes[dst_pos] = dstBytes[dst_pos - offset]; + dst[dst_pos] = dst[dst_pos - offset]; dst_pos++; } while (--blen); } else { @@ -549,7 +542,7 @@ class HostDecompressor_SNAPPY : public HostDecompressor { } blen++; if (cur + blen > end || blen > bytes_left) break; - memcpy(dstBytes + dst_pos, cur, blen); + memcpy(dst.data() + dst_pos, cur, blen); cur += blen; dst_pos += blen; bytes_left -= blen; diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index f51fd28676e..bd7f0caac03 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -437,7 +437,7 @@ const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLe } else { // Compressed block dst_length += - m_decompressor->Decompress(dst + dst_length, m_blockSize, srcBytes + i, block_len); + m_decompressor->decompress({srcBytes + i, block_len}, {dst + dst_length, m_blockSize}); } i += block_len; } From ec2654ecad16322742f03f26d8e0277b16bdc733 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 2 May 2022 00:51:27 -0700 Subject: [PATCH 02/17] remove IO_UNCOMP_STREAM_TYPE --- cpp/include/cudf/io/types.hpp | 6 +++- cpp/src/io/comp/io_uncomp.h | 17 +-------- cpp/src/io/comp/nvcomp_adapter.cpp | 18 +++++----- cpp/src/io/comp/nvcomp_adapter.hpp | 2 +- cpp/src/io/comp/uncomp.cpp | 56 +++++++++++++----------------- cpp/src/io/orc/orc.cpp | 12 +++---- 6 files changed, 46 insertions(+), 65 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 23ed0153f3f..f4be64aaee7 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -60,7 +60,11 @@ enum class compression_type { BZIP2, ///< BZIP2 format, using Burrows-Wheeler transform BROTLI, ///< BROTLI format, using LZ77 + Huffman + 2nd order context modeling ZIP, ///< ZIP format, using DEFLATE algorithm - XZ ///< XZ format, using LZMA(2) algorithm + XZ, ///< XZ format, using LZMA(2) algorithm + ZLIB, ///< ZIP format, using DEFLATE algorithm + LZ4, + LZO, + ZSTD }; /** diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index cfd3d7a0a5c..416d5c6a78d 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -27,21 +27,6 @@ using cudf::host_span; namespace cudf { namespace io { -enum { - IO_UNCOMP_STREAM_TYPE_INFER = 0, - IO_UNCOMP_STREAM_TYPE_GZIP = 1, - IO_UNCOMP_STREAM_TYPE_ZIP = 2, - IO_UNCOMP_STREAM_TYPE_BZIP2 = 3, - IO_UNCOMP_STREAM_TYPE_XZ = 4, - IO_UNCOMP_STREAM_TYPE_INFLATE = 5, - IO_UNCOMP_STREAM_TYPE_SNAPPY = 6, - IO_UNCOMP_STREAM_TYPE_BROTLI = 7, - IO_UNCOMP_STREAM_TYPE_LZ4 = 8, - IO_UNCOMP_STREAM_TYPE_LZO = 9, - IO_UNCOMP_STREAM_TYPE_ZSTD = 10, -}; - -std::vector io_uncompress_single_h2d(void const* src, size_t src_size, int stream_type); std::vector get_uncompressed_data(host_span data, compression_type compression); @@ -51,7 +36,7 @@ class HostDecompressor { virtual ~HostDecompressor() {} public: - static std::unique_ptr Create(int stream_type); + static std::unique_ptr Create(compression_type compression); }; /** diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index b2e6f07b80b..0fde4e1a5c4 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -23,9 +23,9 @@ namespace cudf::io::nvcomp { template -auto batched_decompress_get_temp_size(compression_type type, Args&&... args) +auto batched_decompress_get_temp_size(compression_type compression, Args&&... args) { - switch (type) { + switch (compression) { case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressGetTempSize(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); @@ -33,27 +33,27 @@ auto batched_decompress_get_temp_size(compression_type type, Args&&... args) }; template -auto batched_decompress_async(compression_type type, Args&&... args) +auto batched_decompress_async(compression_type compression, Args&&... args) { - switch (type) { + switch (compression) { case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressAsync(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); } }; -size_t get_temp_size(compression_type type, size_t num_chunks, size_t max_uncomp_chunk_size) +size_t get_temp_size(compression_type compression, size_t num_chunks, size_t max_uncomp_chunk_size) { size_t temp_size = 0; nvcompStatus_t nvcomp_status = - batched_decompress_get_temp_size(type, num_chunks, max_uncomp_chunk_size, &temp_size); + batched_decompress_get_temp_size(compression, num_chunks, max_uncomp_chunk_size, &temp_size); CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Unable to get scratch size for decompression"); return temp_size; } -void batched_decompress(compression_type type, +void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, device_span statuses, @@ -67,8 +67,8 @@ void batched_decompress(compression_type type, rmm::device_uvector actual_uncompressed_data_sizes(num_chunks, stream); rmm::device_uvector nvcomp_statuses(num_chunks, stream); // Temporary space required for decompression - rmm::device_buffer scratch(get_temp_size(type, num_chunks, max_uncomp_chunk_size), stream); - auto const nvcomp_status = batched_decompress_async(type, + rmm::device_buffer scratch(get_temp_size(compression, num_chunks, max_uncomp_chunk_size), stream); + auto const nvcomp_status = batched_decompress_async(compression, nvcomp_args.compressed_data_ptrs.data(), nvcomp_args.compressed_data_sizes.data(), nvcomp_args.uncompressed_data_sizes.data(), diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index a0eb6bc4fbf..c289e2d2ade 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -36,7 +36,7 @@ enum class compression_type { SNAPPY }; * @param[in] max_uncomp_page_size maximum size of uncompressed block * @param[in] stream CUDA stream to use */ -void batched_decompress(compression_type type, +void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, device_span statuses, diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 8fa93115d5c..368a6dfd8fe 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -292,7 +292,9 @@ int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t * * @return Vector containing the uncompressed output */ -std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int stream_type) +std::vector io_uncompress_single_h2d(const void* src, + size_t src_size, + compression_type compression) { const uint8_t* raw = static_cast(src); const uint8_t* comp_data = nullptr; @@ -302,19 +304,19 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int CUDF_EXPECTS(src != nullptr, "Decompression: Source cannot be nullptr"); CUDF_EXPECTS(src_size != 0, "Decompression: Source size cannot be 0"); - switch (stream_type) { - case IO_UNCOMP_STREAM_TYPE_INFER: - case IO_UNCOMP_STREAM_TYPE_GZIP: { + switch (compression) { + case compression_type::AUTO: + case compression_type::GZIP: { gz_archive_s gz; if (ParseGZArchive(&gz, raw, src_size)) { - stream_type = IO_UNCOMP_STREAM_TYPE_GZIP; + compression = compression_type::GZIP; comp_data = gz.comp_data; comp_len = gz.comp_len; uncomp_len = gz.isize; } - if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER + if (compression != compression_type::AUTO) break; // Fall through for INFER } - case IO_UNCOMP_STREAM_TYPE_ZIP: { + case compression_type::ZIP: { zip_archive_s za; if (OpenZipArchive(&za, raw, src_size)) { size_t cdfh_ofs = 0; @@ -337,7 +339,7 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int size_t file_end = file_start + lfh->comp_size; if (file_end <= src_size) { // Pick the first valid file of non-zero size (only 1 file expected in archive) - stream_type = IO_UNCOMP_STREAM_TYPE_ZIP; + compression = compression_type::ZIP; comp_data = raw + file_start; comp_len = lfh->comp_size; uncomp_len = lfh->uncomp_size; @@ -350,20 +352,20 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int } } } - if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER - case IO_UNCOMP_STREAM_TYPE_BZIP2: + if (compression != compression_type::AUTO) break; // Fall through for INFER + case compression_type::BZIP2: if (src_size > 4) { const bz2_file_header_s* fhdr = reinterpret_cast(raw); // Check for BZIP2 file signature "BZh1" to "BZh9" if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && fhdr->blksz >= '1' && fhdr->blksz <= '9') { - stream_type = IO_UNCOMP_STREAM_TYPE_BZIP2; + compression = compression_type::BZIP2; comp_data = raw; comp_len = src_size; uncomp_len = 0; } } - if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER + if (compression != compression_type::AUTO) break; // Fall through for INFER default: // Unsupported format break; @@ -377,14 +379,14 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int // ~4:1 compression for initial size } - if (stream_type == IO_UNCOMP_STREAM_TYPE_GZIP || stream_type == IO_UNCOMP_STREAM_TYPE_ZIP) { + if (compression == compression_type::GZIP || compression == compression_type::ZIP) { // INFLATE std::vector dst(uncomp_len); CUDF_EXPECTS(cpu_inflate_vector(dst, comp_data, comp_len) == 0, "Decompression: error in stream"); return dst; } - if (stream_type == IO_UNCOMP_STREAM_TYPE_BZIP2) { + if (compression == compression_type::BZIP2) { size_t src_ofs = 0; size_t dst_ofs = 0; int bz_err = 0; @@ -424,17 +426,7 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int std::vector get_uncompressed_data(host_span const data, compression_type compression) { - auto const comp_type = [compression]() { - switch (compression) { - case compression_type::GZIP: return IO_UNCOMP_STREAM_TYPE_GZIP; - case compression_type::ZIP: return IO_UNCOMP_STREAM_TYPE_ZIP; - case compression_type::BZIP2: return IO_UNCOMP_STREAM_TYPE_BZIP2; - case compression_type::XZ: return IO_UNCOMP_STREAM_TYPE_XZ; - default: return IO_UNCOMP_STREAM_TYPE_INFER; - } - }(); - - return io_uncompress_single_h2d(data.data(), data.size(), comp_type); + return io_uncompress_single_h2d(data.data(), data.size(), compression); } /** @@ -555,18 +547,18 @@ class HostDecompressor_SNAPPY : public HostDecompressor { /** * @brief CPU decompression class * - * @param[in] stream_type compression method (IO_UNCOMP_STREAM_TYPE_XXX) + * @param[in] type compression type * * @returns corresponding HostDecompressor class, nullptr if failure */ -std::unique_ptr HostDecompressor::Create(int stream_type) +std::unique_ptr HostDecompressor::Create(compression_type compression) { - switch (stream_type) { - case IO_UNCOMP_STREAM_TYPE_GZIP: return std::make_unique(true); - case IO_UNCOMP_STREAM_TYPE_INFLATE: return std::make_unique(false); - case IO_UNCOMP_STREAM_TYPE_SNAPPY: return std::make_unique(); + switch (compression) { + case compression_type::GZIP: return std::make_unique(true); + case compression_type::ZLIB: return std::make_unique(false); + case compression_type::SNAPPY: return std::make_unique(); + default: CUDF_FAIL("Unsupported compression type"); } - CUDF_FAIL("Unsupported compression type"); } } // namespace io diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index bd7f0caac03..3c7c4848901 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -360,20 +360,20 @@ OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) : m_kind(kind), m_blockSize(blockSize) { if (kind != NONE) { - int stream_type = IO_UNCOMP_STREAM_TYPE_INFER; // Will be treated as invalid + auto stream_type = compression_type::AUTO; // Will be treated as invalid switch (kind) { case NONE: break; case ZLIB: - stream_type = IO_UNCOMP_STREAM_TYPE_INFLATE; + stream_type = compression_type::ZLIB; m_log2MaxRatio = 11; // < 2048:1 break; case SNAPPY: - stream_type = IO_UNCOMP_STREAM_TYPE_SNAPPY; + stream_type = compression_type::SNAPPY; m_log2MaxRatio = 5; // < 32:1 break; - case LZO: stream_type = IO_UNCOMP_STREAM_TYPE_LZO; break; - case LZ4: stream_type = IO_UNCOMP_STREAM_TYPE_LZ4; break; - case ZSTD: stream_type = IO_UNCOMP_STREAM_TYPE_ZSTD; break; + case LZO: stream_type = compression_type::LZO; break; + case LZ4: stream_type = compression_type::LZ4; break; + case ZSTD: stream_type = compression_type::ZSTD; break; } m_decompressor = HostDecompressor::Create(stream_type); } else { From 172912adde8d0e3dfef82ea1a37e8a24d85b5ac2 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 2 May 2022 11:53:05 -0700 Subject: [PATCH 03/17] copyright year --- cpp/src/io/comp/io_uncomp.h | 2 +- cpp/src/io/comp/uncomp.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 416d5c6a78d..1cce99610f7 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 368a6dfd8fe..0983aa0d191 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From e617c621aaf687057a5865feec05022c4067b4c8 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 2 May 2022 12:56:17 -0700 Subject: [PATCH 04/17] start HostDecompressor removal --- cpp/src/io/comp/io_uncomp.h | 5 +++- cpp/src/io/comp/uncomp.cpp | 18 ++++++++++++-- cpp/src/io/csv/reader_impl.cu | 2 +- cpp/src/io/json/reader_impl.cu | 2 +- cpp/src/io/orc/orc.cpp | 44 ++++++++++++++++------------------ cpp/src/io/orc/orc.h | 7 +++--- cpp/src/io/orc/reader_impl.cu | 6 ++--- 7 files changed, 49 insertions(+), 35 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 1cce99610f7..d0e4c55197a 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -28,8 +28,11 @@ using cudf::host_span; namespace cudf { namespace io { -std::vector get_uncompressed_data(host_span data, compression_type compression); +std::vector get_uncompressed_data(compression_type compression, host_span data); +size_t decompress(compression_type compression, + host_span src, + host_span dst); class HostDecompressor { public: virtual size_t decompress(host_span src, host_span dst) = 0; diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 0983aa0d191..b10fe4ee47c 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -423,8 +423,8 @@ std::vector io_uncompress_single_h2d(const void* src, * * @return Vector containing the output uncompressed data */ -std::vector get_uncompressed_data(host_span const data, - compression_type compression) +std::vector get_uncompressed_data(compression_type compression, + host_span const data) { return io_uncompress_single_h2d(data.data(), data.size(), compression); } @@ -543,6 +543,20 @@ class HostDecompressor_SNAPPY : public HostDecompressor { return (bytes_left) ? 0 : uncompressed_size; } }; +size_t decompress(compression_type compression, + host_span src, + host_span dst) +{ + auto const decomp = [compression]() -> std::unique_ptr { + switch (compression) { + case compression_type::GZIP: return std::make_unique(true); + case compression_type::ZLIB: return std::make_unique(false); + case compression_type::SNAPPY: return std::make_unique(); + default: CUDF_FAIL("Unsupported compression type"); + } + }(); + return decomp->decompress(src, dst); +} /** * @brief CPU decompression class diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index cd070d28f38..228c409c970 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -422,7 +422,7 @@ std::pair, selected_rows_offsets> select_data_and_row_ std::vector h_uncomp_data_owner; if (reader_opts.get_compression() != compression_type::NONE) { - h_uncomp_data_owner = get_uncompressed_data(h_data, reader_opts.get_compression()); + h_uncomp_data_owner = get_uncompressed_data(reader_opts.get_compression(), h_data); h_data = h_uncomp_data_owner; } // None of the parameters for row selection is used, we are parsing the entire file diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 20eeec267b1..508283b230d 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -241,7 +241,7 @@ std::vector ingest_raw_input(std::vector> cons if (compression == compression_type::NONE) { return buffer; } else { - return get_uncompressed_data(buffer, compression); + return get_uncompressed_data(compression, buffer); } } diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 3c7c4848901..ef65ec3e002 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -357,27 +357,25 @@ size_t ProtobufWriter::write(const Metadata& s) } OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) - : m_kind(kind), m_blockSize(blockSize) -{ - if (kind != NONE) { - auto stream_type = compression_type::AUTO; // Will be treated as invalid - switch (kind) { - case NONE: break; - case ZLIB: - stream_type = compression_type::ZLIB; - m_log2MaxRatio = 11; // < 2048:1 - break; - case SNAPPY: - stream_type = compression_type::SNAPPY; - m_log2MaxRatio = 5; // < 32:1 - break; - case LZO: stream_type = compression_type::LZO; break; - case LZ4: stream_type = compression_type::LZ4; break; - case ZSTD: stream_type = compression_type::ZSTD; break; - } - m_decompressor = HostDecompressor::Create(stream_type); - } else { - m_log2MaxRatio = 0; + : m_blockSize(blockSize) +{ + switch (kind) { + case NONE: + _compression = compression_type::NONE; + m_log2MaxRatio = 0; + break; + case ZLIB: + _compression = compression_type::ZLIB; + m_log2MaxRatio = 11; // < 2048:1 + break; + case SNAPPY: + _compression = compression_type::SNAPPY; + m_log2MaxRatio = 5; // < 32:1 + break; + case LZO: _compression = compression_type::LZO; break; + case LZ4: _compression = compression_type::LZ4; break; + case ZSTD: _compression = compression_type::ZSTD; break; + default: CUDF_FAIL("Invalid compression type"); } } @@ -393,7 +391,7 @@ OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLen, size_t* dstLen) { // If uncompressed, just pass-through the input - if (m_kind == NONE) { + if (_compression == compression_type::NONE) { *dstLen = srcLen; return srcBytes; } @@ -437,7 +435,7 @@ const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLe } else { // Compressed block dst_length += - m_decompressor->decompress({srcBytes + i, block_len}, {dst + dst_length, m_blockSize}); + decompress(_compression, {srcBytes + i, block_len}, {dst + dst_length, m_blockSize}); } i += block_len; } diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 73eb8b382db..37a86960d57 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -540,14 +540,13 @@ class OrcDecompressor { return (block_len < (m_blockSize >> m_log2MaxRatio)) ? block_len << m_log2MaxRatio : m_blockSize; } - [[nodiscard]] CompressionKind GetKind() const { return m_kind; } + [[nodiscard]] compression_type compression() const { return _compression; } [[nodiscard]] uint32_t GetBlockSize() const { return m_blockSize; } protected: - CompressionKind const m_kind; + compression_type _compression; uint32_t m_log2MaxRatio = 24; // log2 of maximum compression ratio - uint32_t const m_blockSize; - std::unique_ptr m_decompressor; + uint32_t m_blockSize; std::vector m_buf; }; diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 139eb28d1a1..06ace09116b 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -366,12 +366,12 @@ rmm::device_buffer reader::impl::decompress_stripe_data( device_span> inflate_in_view{inflate_in.data(), num_compressed_blocks}; device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; - switch (decompressor->GetKind()) { - case orc::ZLIB: + switch (decompressor->compression()) { + case compression_type::ZLIB: gpuinflate( inflate_in_view, inflate_out_view, inflate_stats, gzip_header_included::NO, stream); break; - case orc::SNAPPY: + case compression_type::SNAPPY: if (nvcomp_integration::is_stable_enabled()) { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, inflate_in_view, From 5c34e8c32c3040d04371874becd435e9d6ed18dd Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 2 May 2022 12:56:59 -0700 Subject: [PATCH 05/17] style --- cpp/src/io/orc/orc.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index ef65ec3e002..3cade69862f 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -356,8 +356,7 @@ size_t ProtobufWriter::write(const Metadata& s) return w.value(); } -OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) - : m_blockSize(blockSize) +OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) : m_blockSize(blockSize) { switch (kind) { case NONE: From 21ddaf3b69f8f34e63bc4d82764c10e6364db2c7 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 2 May 2022 23:43:26 -0700 Subject: [PATCH 06/17] remove hostdecompressor --- cpp/src/io/comp/io_uncomp.h | 8 -- cpp/src/io/comp/uncomp.cpp | 217 ++++++++++++++++-------------------- 2 files changed, 96 insertions(+), 129 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index d0e4c55197a..37b48706dca 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -33,14 +33,6 @@ std::vector get_uncompressed_data(compression_type compression, host_span< size_t decompress(compression_type compression, host_span src, host_span dst); -class HostDecompressor { - public: - virtual size_t decompress(host_span src, host_span dst) = 0; - virtual ~HostDecompressor() {} - - public: - static std::unique_ptr Create(compression_type compression); -}; /** * @brief GZIP header flags diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index b10fe4ee47c..cc5dda2e4e1 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -430,147 +430,122 @@ std::vector get_uncompressed_data(compression_type compression, } /** - * @brief ZLIB host decompressor class + * @brief ZLIB host decompressor (no header) */ -class HostDecompressor_ZLIB : public HostDecompressor { - public: - HostDecompressor_ZLIB(bool gz_hdr_) : gz_hdr(gz_hdr_) {} - size_t decompress(host_span src, host_span dst) override - { - if (gz_hdr) { - gz_archive_s gz; - if (!ParseGZArchive(&gz, src.data(), src.size())) { return 0; } - src = {gz.comp_data, gz.comp_len}; - } - size_t uncomp_size = dst.size(); - if (0 == cpu_inflate(dst.data(), &uncomp_size, src.data(), src.size())) { - return uncomp_size; - } else { - return 0; // Throw? - } +size_t decompress_zlib(host_span src, host_span dst) +{ + size_t uncomp_size = dst.size(); + if (0 == cpu_inflate(dst.data(), &uncomp_size, src.data(), src.size())) { + return uncomp_size; + } else { + return 0; // Throw? } +} - protected: - const bool gz_hdr; -}; +/** + * @brief GZIP host decompressor (includes header) + */ +size_t decompress_gzip(host_span src, host_span dst) +{ + gz_archive_s gz; + if (!ParseGZArchive(&gz, src.data(), src.size())) { return 0; } + return decompress_zlib({gz.comp_data, gz.comp_len}, dst); +} /** - * @brief SNAPPY host decompressor class + * @brief SNAPPY host decompressor */ -class HostDecompressor_SNAPPY : public HostDecompressor { - public: - HostDecompressor_SNAPPY() {} - size_t decompress(host_span src, host_span dst) override +size_t decompress_snappy(host_span src, host_span dst) +{ + if (dst.empty() || src.size() < 1) { return 0; } + uint32_t uncompressed_size, bytes_left, dst_pos; + auto cur = src.begin(); + auto const end = src.end(); + // Read uncompressed length (varint) { - if (dst.empty() || src.size() < 1) { return 0; } - uint32_t uncompressed_size, bytes_left, dst_pos; - auto cur = src.begin(); - auto const end = src.end(); - // Read uncompressed length (varint) - { - uint32_t l = 0, c; - uncompressed_size = 0; - do { - uint32_t lo7; - c = *cur++; - lo7 = c & 0x7f; - if (l >= 28 && c > 0xf) { return 0; } - uncompressed_size |= lo7 << l; - l += 7; - } while (c > 0x7f && cur < end); - if (!uncompressed_size || uncompressed_size > dst.size() || cur >= end) { - // Destination buffer too small or zero size - return 0; - } - } - // Decode lz77 - dst_pos = 0; - bytes_left = uncompressed_size; + uint32_t l = 0, c; + uncompressed_size = 0; do { - uint32_t blen = *cur++; - - if (blen & 3) { - // Copy - uint32_t offset; - if (blen & 2) { - // xxxxxx1x: copy with 6-bit length, 2-byte or 4-byte offset + uint32_t lo7; + c = *cur++; + lo7 = c & 0x7f; + if (l >= 28 && c > 0xf) { return 0; } + uncompressed_size |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + if (!uncompressed_size || uncompressed_size > dst.size() || cur >= end) { + // Destination buffer too small or zero size + return 0; + } + } + // Decode lz77 + dst_pos = 0; + bytes_left = uncompressed_size; + do { + uint32_t blen = *cur++; + + if (blen & 3) { + // Copy + uint32_t offset; + if (blen & 2) { + // xxxxxx1x: copy with 6-bit length, 2-byte or 4-byte offset + if (cur + 2 > end) break; + offset = *reinterpret_cast(cur); + cur += 2; + if (blen & 1) // 4-byte offset + { if (cur + 2 > end) break; - offset = *reinterpret_cast(cur); + offset |= (*reinterpret_cast(cur)) << 16; cur += 2; - if (blen & 1) // 4-byte offset - { - if (cur + 2 > end) break; - offset |= (*reinterpret_cast(cur)) << 16; - cur += 2; - } - blen = (blen >> 2) + 1; - } else { - // xxxxxx01.oooooooo: copy with 3-bit length, 11-bit offset - if (cur >= end) break; - offset = ((blen & 0xe0) << 3) | (*cur++); - blen = ((blen >> 2) & 7) + 4; } - if (offset - 1u >= dst_pos || blen > bytes_left) break; - bytes_left -= blen; - do { - dst[dst_pos] = dst[dst_pos - offset]; - dst_pos++; - } while (--blen); + blen = (blen >> 2) + 1; } else { - // xxxxxx00: literal - blen >>= 2; - if (blen >= 60) { - uint32_t num_bytes = blen - 59; - if (cur + num_bytes >= end) break; - blen = cur[0]; - if (num_bytes > 1) { - blen |= cur[1] << 8; - if (num_bytes > 2) { - blen |= cur[2] << 16; - if (num_bytes > 3) { blen |= cur[3] << 24; } - } + // xxxxxx01.oooooooo: copy with 3-bit length, 11-bit offset + if (cur >= end) break; + offset = ((blen & 0xe0) << 3) | (*cur++); + blen = ((blen >> 2) & 7) + 4; + } + if (offset - 1u >= dst_pos || blen > bytes_left) break; + bytes_left -= blen; + do { + dst[dst_pos] = dst[dst_pos - offset]; + dst_pos++; + } while (--blen); + } else { + // xxxxxx00: literal + blen >>= 2; + if (blen >= 60) { + uint32_t num_bytes = blen - 59; + if (cur + num_bytes >= end) break; + blen = cur[0]; + if (num_bytes > 1) { + blen |= cur[1] << 8; + if (num_bytes > 2) { + blen |= cur[2] << 16; + if (num_bytes > 3) { blen |= cur[3] << 24; } } - cur += num_bytes; } - blen++; - if (cur + blen > end || blen > bytes_left) break; - memcpy(dst.data() + dst_pos, cur, blen); - cur += blen; - dst_pos += blen; - bytes_left -= blen; + cur += num_bytes; } - } while (bytes_left && cur < end); - return (bytes_left) ? 0 : uncompressed_size; - } -}; -size_t decompress(compression_type compression, - host_span src, - host_span dst) -{ - auto const decomp = [compression]() -> std::unique_ptr { - switch (compression) { - case compression_type::GZIP: return std::make_unique(true); - case compression_type::ZLIB: return std::make_unique(false); - case compression_type::SNAPPY: return std::make_unique(); - default: CUDF_FAIL("Unsupported compression type"); + blen++; + if (cur + blen > end || blen > bytes_left) break; + memcpy(dst.data() + dst_pos, cur, blen); + cur += blen; + dst_pos += blen; + bytes_left -= blen; } - }(); - return decomp->decompress(src, dst); + } while (bytes_left && cur < end); + return (bytes_left) ? 0 : uncompressed_size; } -/** - * @brief CPU decompression class - * - * @param[in] type compression type - * - * @returns corresponding HostDecompressor class, nullptr if failure - */ -std::unique_ptr HostDecompressor::Create(compression_type compression) +size_t decompress(compression_type compression, + host_span src, + host_span dst) { switch (compression) { - case compression_type::GZIP: return std::make_unique(true); - case compression_type::ZLIB: return std::make_unique(false); - case compression_type::SNAPPY: return std::make_unique(); + case compression_type::GZIP: return decompress_gzip(src, dst); + case compression_type::ZLIB: return decompress_zlib(src, dst); + case compression_type::SNAPPY: return decompress_snappy(src, dst); default: CUDF_FAIL("Unsupported compression type"); } } From 49d3eab2ede7840e12c8ad31356d57a92a603b47 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 3 May 2022 00:22:45 -0700 Subject: [PATCH 07/17] merge io_uncompress_single_h2d and get_uncompressed_data --- cpp/src/io/comp/io_uncomp.h | 13 ++++++++- cpp/src/io/comp/uncomp.cpp | 53 ++++++++-------------------------- cpp/src/io/csv/reader_impl.cu | 2 +- cpp/src/io/json/reader_impl.cu | 2 +- 4 files changed, 26 insertions(+), 44 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 37b48706dca..b2d98a3b606 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -28,7 +28,18 @@ using cudf::host_span; namespace cudf { namespace io { -std::vector get_uncompressed_data(compression_type compression, host_span data); +/** + * @brief Decompresses a gzip/zip/bzip2/xz file stored in system memory. + * + * The result is allocated and stored in a vector. + * If the function call fails, the output vector is empty. + * + * @param compression Type of compression of the input data + * @param src Compressed buffer + * + * @return Vector containing the Decompressed output + */ +std::vector decompress(compression_type compression, host_span const src); size_t decompress(compression_type compression, host_span src, diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index cc5dda2e4e1..4dac6a45dd7 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -280,35 +280,21 @@ int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t return (zerr == Z_STREAM_END) ? Z_OK : zerr; } -/** - * @brief Uncompresses a gzip/zip/bzip2/xz file stored in system memory. - * - * The result is allocated and stored in a vector. - * If the function call fails, the output vector is empty. - * - * @param[in] src Pointer to the compressed data in system memory - * @param[in] src_size The size of the compressed data, in bytes - * @param[in] stream_type Type of compression of the input data - * - * @return Vector containing the uncompressed output - */ -std::vector io_uncompress_single_h2d(const void* src, - size_t src_size, - compression_type compression) +std::vector decompress(compression_type compression, host_span const src) { - const uint8_t* raw = static_cast(src); + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto raw = reinterpret_cast(src.data()); const uint8_t* comp_data = nullptr; size_t comp_len = 0; size_t uncomp_len = 0; - CUDF_EXPECTS(src != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(src_size != 0, "Decompression: Source size cannot be 0"); - switch (compression) { case compression_type::AUTO: case compression_type::GZIP: { gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src_size)) { + if (ParseGZArchive(&gz, raw, src.size())) { compression = compression_type::GZIP; comp_data = gz.comp_data; comp_len = gz.comp_len; @@ -318,7 +304,7 @@ std::vector io_uncompress_single_h2d(const void* src, } case compression_type::ZIP: { zip_archive_s za; - if (OpenZipArchive(&za, raw, src_size)) { + if (OpenZipArchive(&za, raw, src.size())) { size_t cdfh_ofs = 0; for (int i = 0; i < za.eocd->num_entries; i++) { const zip_cdfh_s* cdfh = reinterpret_cast( @@ -332,12 +318,12 @@ std::vector io_uncompress_single_h2d(const void* src, if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { size_t lfh_ofs = cdfh->hdr_ofs; const zip_lfh_s* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src_size && lfh->sig == 0x04034b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src_size) { + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x04034b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; size_t file_end = file_start + lfh->comp_size; - if (file_end <= src_size) { + if (file_end <= src.size()) { // Pick the first valid file of non-zero size (only 1 file expected in archive) compression = compression_type::ZIP; comp_data = raw + file_start; @@ -354,14 +340,14 @@ std::vector io_uncompress_single_h2d(const void* src, } if (compression != compression_type::AUTO) break; // Fall through for INFER case compression_type::BZIP2: - if (src_size > 4) { + if (src.size() > 4) { const bz2_file_header_s* fhdr = reinterpret_cast(raw); // Check for BZIP2 file signature "BZh1" to "BZh9" if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && fhdr->blksz >= '1' && fhdr->blksz <= '9') { compression = compression_type::BZIP2; comp_data = raw; - comp_len = src_size; + comp_len = src.size(); uncomp_len = 0; } } @@ -414,21 +400,6 @@ std::vector io_uncompress_single_h2d(const void* src, CUDF_FAIL("Unsupported compressed stream type"); } -/** - * @brief Uncompresses the input data and stores the allocated result into - * a vector. - * - * @param[in] data Pointer to the csv data in host memory - * @param[in] compression String describing the compression type - * - * @return Vector containing the output uncompressed data - */ -std::vector get_uncompressed_data(compression_type compression, - host_span const data) -{ - return io_uncompress_single_h2d(data.data(), data.size(), compression); -} - /** * @brief ZLIB host decompressor (no header) */ diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index 228c409c970..f2e42fc46ce 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -422,7 +422,7 @@ std::pair, selected_rows_offsets> select_data_and_row_ std::vector h_uncomp_data_owner; if (reader_opts.get_compression() != compression_type::NONE) { - h_uncomp_data_owner = get_uncompressed_data(reader_opts.get_compression(), h_data); + h_uncomp_data_owner = decompress(reader_opts.get_compression(), h_data); h_data = h_uncomp_data_owner; } // None of the parameters for row selection is used, we are parsing the entire file diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 508283b230d..1c8e164c9b0 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -241,7 +241,7 @@ std::vector ingest_raw_input(std::vector> cons if (compression == compression_type::NONE) { return buffer; } else { - return get_uncompressed_data(compression, buffer); + return decompress(compression, buffer); } } From 0163b1c7c2344ca3a10f940ae069b3b449e4996b Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 3 May 2022 00:50:57 -0700 Subject: [PATCH 08/17] decompress to vector char ->uint8_t --- cpp/src/io/comp/io_uncomp.h | 2 +- cpp/src/io/comp/uncomp.cpp | 15 +++++++-------- cpp/src/io/csv/reader_impl.cu | 8 +++++--- cpp/src/io/json/reader_impl.cu | 17 +++++++++-------- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index b2d98a3b606..93672828f73 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -39,7 +39,7 @@ namespace io { * * @return Vector containing the Decompressed output */ -std::vector decompress(compression_type compression, host_span const src); +std::vector decompress(compression_type compression, host_span src); size_t decompress(compression_type compression, host_span src, diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 4dac6a45dd7..0512cdc1ceb 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -249,7 +249,7 @@ int cpu_inflate(uint8_t* uncomp_data, size_t* destLen, const uint8_t* comp_data, * @param[in] comp_data Raw compressed data * @param[in] comp_len Compressed data size */ -int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t comp_len) +int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t comp_len) { int zerr; z_stream strm; @@ -258,7 +258,7 @@ int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t strm.next_in = const_cast(reinterpret_cast(comp_data)); strm.avail_in = comp_len; strm.total_in = 0; - strm.next_out = reinterpret_cast(dst.data()); + strm.next_out = dst.data(); strm.avail_out = dst.size(); strm.total_out = 0; zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers @@ -280,12 +280,12 @@ int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t return (zerr == Z_STREAM_END) ? Z_OK : zerr; } -std::vector decompress(compression_type compression, host_span const src) +std::vector decompress(compression_type compression, host_span src) { CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - auto raw = reinterpret_cast(src.data()); + auto raw = src.data(); const uint8_t* comp_data = nullptr; size_t comp_len = 0; size_t uncomp_len = 0; @@ -367,7 +367,7 @@ std::vector decompress(compression_type compression, host_span if (compression == compression_type::GZIP || compression == compression_type::ZIP) { // INFLATE - std::vector dst(uncomp_len); + std::vector dst(uncomp_len); CUDF_EXPECTS(cpu_inflate_vector(dst, comp_data, comp_len) == 0, "Decompression: error in stream"); return dst; @@ -376,11 +376,10 @@ std::vector decompress(compression_type compression, host_span size_t src_ofs = 0; size_t dst_ofs = 0; int bz_err = 0; - std::vector dst(uncomp_len); + std::vector dst(uncomp_len); do { size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress( - comp_data, comp_len, reinterpret_cast(dst.data()) + dst_ofs, &dst_len, &src_ofs); + bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); if (bz_err == BZ_OUTBUFF_FULL) { // TBD: We could infer the compression ratio based on produced/consumed byte counts // in order to minimize realloc events and over-allocation diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index f2e42fc46ce..f61d9daa702 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -419,11 +419,13 @@ std::pair, selected_rows_offsets> select_data_and_row_ reinterpret_cast(buffer->data()), buffer->size()); - std::vector h_uncomp_data_owner; + std::vector h_uncomp_data_owner; if (reader_opts.get_compression() != compression_type::NONE) { - h_uncomp_data_owner = decompress(reader_opts.get_compression(), h_data); - h_data = h_uncomp_data_owner; + h_uncomp_data_owner = + decompress(reader_opts.get_compression(), {buffer->data(), buffer->size()}); + h_data = {reinterpret_cast(h_uncomp_data_owner.data()), + h_uncomp_data_owner.size()}; } // None of the parameters for row selection is used, we are parsing the entire file const bool load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 && diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 1c8e164c9b0..9c2b1f02d4e 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -214,11 +214,11 @@ std::pair, col_map_ptr_type> get_json_object_keys_hashe create_col_names_hash_map(sorted_info->get_column(2).view(), stream)}; } -std::vector ingest_raw_input(std::vector> const& sources, - compression_type compression, - size_t range_offset, - size_t range_size, - size_t range_size_padded) +std::vector ingest_raw_input(std::vector> const& sources, + compression_type compression, + size_t range_offset, + size_t range_size, + size_t range_size_padded) { // Iterate through the user defined sources and read the contents into the local buffer size_t total_source_size = 0; @@ -227,13 +227,13 @@ std::vector ingest_raw_input(std::vector> cons } total_source_size = total_source_size - (range_offset * sources.size()); - auto buffer = std::vector(total_source_size); + auto buffer = std::vector(total_source_size); size_t bytes_read = 0; for (const auto& source : sources) { if (!source->is_empty()) { auto data_size = (range_size_padded != 0) ? range_size_padded : source->size(); - auto destination = reinterpret_cast(buffer.data()) + bytes_read; + auto destination = buffer.data() + bytes_read; bytes_read += source->host_read(range_offset, data_size, destination); } } @@ -587,8 +587,9 @@ table_with_metadata read_json(std::vector>& sources, auto range_size = reader_opts.get_byte_range_size(); auto range_size_padded = reader_opts.get_byte_range_size_with_padding(); - auto h_data = ingest_raw_input( + auto h_raw_data = ingest_raw_input( sources, reader_opts.get_compression(), range_offset, range_size, range_size_padded); + host_span h_data{reinterpret_cast(h_raw_data.data()), h_raw_data.size()}; CUDF_EXPECTS(h_data.size() != 0, "Ingest failed: uncompressed input data has zero size.\n"); From 7d9283e56d56c23afb0e32e2a0a108a4d737c660 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 3 May 2022 15:47:19 -0700 Subject: [PATCH 09/17] throw instead of returning empty output --- cpp/src/io/comp/uncomp.cpp | 48 ++++++++++++++------------------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 0512cdc1ceb..ff5b2155a62 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -249,23 +249,17 @@ int cpu_inflate(uint8_t* uncomp_data, size_t* destLen, const uint8_t* comp_data, * @param[in] comp_data Raw compressed data * @param[in] comp_len Compressed data size */ -int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t comp_len) +void cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t comp_len) { - int zerr; - z_stream strm; - - memset(&strm, 0, sizeof(strm)); + z_stream strm{}; strm.next_in = const_cast(reinterpret_cast(comp_data)); strm.avail_in = comp_len; strm.total_in = 0; strm.next_out = dst.data(); strm.avail_out = dst.size(); strm.total_out = 0; - zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers - if (zerr != 0) { - dst.resize(0); - return zerr; - } + auto zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers + CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream"); do { if (strm.avail_out == 0) { dst.resize(strm.total_out + (1 << 30)); @@ -277,7 +271,7 @@ int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size strm.total_out == dst.size()); dst.resize(strm.total_out); inflateEnd(&strm); - return (zerr == Z_STREAM_END) ? Z_OK : zerr; + CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); } std::vector decompress(compression_type compression, host_span src) @@ -352,13 +346,10 @@ std::vector decompress(compression_type compression, host_span 0, "Unsupported compressed stream type"); + CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); if (uncomp_len <= 0) { uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume @@ -368,8 +359,7 @@ std::vector decompress(compression_type compression, host_span dst(uncomp_len); - CUDF_EXPECTS(cpu_inflate_vector(dst, comp_data, comp_len) == 0, - "Decompression: error in stream"); + cpu_inflate_vector(dst, comp_data, comp_len); return dst; } if (compression == compression_type::BZIP2) { @@ -405,11 +395,9 @@ std::vector decompress(compression_type compression, host_span src, host_span dst) { size_t uncomp_size = dst.size(); - if (0 == cpu_inflate(dst.data(), &uncomp_size, src.data(), src.size())) { - return uncomp_size; - } else { - return 0; // Throw? - } + CUDF_EXPECTS(0 == cpu_inflate(dst.data(), &uncomp_size, src.data(), src.size()), + "ZLIB decompression failed"); + return uncomp_size; } /** @@ -418,7 +406,8 @@ size_t decompress_zlib(host_span src, host_span dst) size_t decompress_gzip(host_span src, host_span dst) { gz_archive_s gz; - if (!ParseGZArchive(&gz, src.data(), src.size())) { return 0; } + auto const parse_succeeded = ParseGZArchive(&gz, src.data(), src.size()); + CUDF_EXPECTS(parse_succeeded, "Failed to parse GZIP header"); return decompress_zlib({gz.comp_data, gz.comp_len}, dst); } @@ -427,7 +416,7 @@ size_t decompress_gzip(host_span src, host_span dst) */ size_t decompress_snappy(host_span src, host_span dst) { - if (dst.empty() || src.size() < 1) { return 0; } + CUDF_EXPECTS(not dst.empty() and src.size() >= 1, "invalid Snappy decompress inputs"); uint32_t uncompressed_size, bytes_left, dst_pos; auto cur = src.begin(); auto const end = src.end(); @@ -443,10 +432,8 @@ size_t decompress_snappy(host_span src, host_span dst) uncompressed_size |= lo7 << l; l += 7; } while (c > 0x7f && cur < end); - if (!uncompressed_size || uncompressed_size > dst.size() || cur >= end) { - // Destination buffer too small or zero size - return 0; - } + CUDF_EXPECTS(uncompressed_size != 0 and uncompressed_size <= dst.size() and cur < end, + "Destination buffer too small"); } // Decode lz77 dst_pos = 0; @@ -505,7 +492,8 @@ size_t decompress_snappy(host_span src, host_span dst) bytes_left -= blen; } } while (bytes_left && cur < end); - return (bytes_left) ? 0 : uncompressed_size; + CUDF_EXPECTS(bytes_left == 0, "Snappy Decompression failed"); + return uncompressed_size; } size_t decompress(compression_type compression, From a61e241bbd4bb31585691055f436e08732dc8231 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 4 May 2022 19:30:04 -0700 Subject: [PATCH 10/17] more spans --- cpp/src/io/orc/aggregate_orc_metadata.cpp | 7 +-- cpp/src/io/orc/orc.cpp | 68 +++++++++-------------- cpp/src/io/orc/orc.h | 10 +++- 3 files changed, 38 insertions(+), 47 deletions(-) diff --git a/cpp/src/io/orc/aggregate_orc_metadata.cpp b/cpp/src/io/orc/aggregate_orc_metadata.cpp index 47244279599..6bbc033a9ba 100644 --- a/cpp/src/io/orc/aggregate_orc_metadata.cpp +++ b/cpp/src/io/orc/aggregate_orc_metadata.cpp @@ -233,10 +233,9 @@ std::vector aggregate_orc_metadata::select_stri "Invalid stripe information"); const auto buffer = per_file_metadata[mapping.source_idx].source->host_read(sf_comp_offset, sf_comp_length); - size_t sf_length = 0; - auto sf_data = per_file_metadata[mapping.source_idx].decompressor->Decompress( - buffer->data(), sf_comp_length, &sf_length); - ProtobufReader(sf_data, sf_length) + auto sf_data = per_file_metadata[mapping.source_idx].decompressor->decompress_blocks( + {buffer->data(), buffer->size()}); + ProtobufReader(sf_data.data(), sf_data.size()) .read(per_file_metadata[mapping.source_idx].stripefooters[i]); mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i]; if (stripe->indexLength == 0) { row_grp_idx_present = false; } diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 3cade69862f..da7fc928681 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -378,28 +378,20 @@ OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) : m_b } } -/** - * @brief ORC block decompression - * - * @param[in] srcBytes compressed data - * @param[in] srcLen length of compressed data - * @param[out] dstLen length of uncompressed data - * - * @returns pointer to uncompressed data, nullptr if error - */ -const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLen, size_t* dstLen) +host_span OrcDecompressor::decompress_blocks(host_span src) { // If uncompressed, just pass-through the input - if (_compression == compression_type::NONE) { - *dstLen = srcLen; - return srcBytes; - } + if (src.empty() or _compression == compression_type::NONE) { return src; } + + constexpr int header_size = 3; + CUDF_EXPECTS(src.size() >= header_size, "Total size is less than the 3-byte header"); + // First, scan the input for the number of blocks and worst-case output size size_t max_dst_length = 0; - for (size_t i = 0; i + 3 < srcLen;) { - uint32_t block_len = srcBytes[i] | (srcBytes[i + 1] << 8) | (srcBytes[i + 2] << 16); + for (size_t i = 0; i + header_size < src.size();) { + uint32_t block_len = src[i] | (src[i + 1] << 8) | (src[i + 2] << 16); uint32_t is_uncompressed = block_len & 1; - i += 3; + i += header_size; block_len >>= 1; if (is_uncompressed) { // Uncompressed block @@ -408,38 +400,32 @@ const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLe max_dst_length += m_blockSize; } i += block_len; - if (i > srcLen || block_len > m_blockSize) { return nullptr; } + CUDF_EXPECTS(i <= src.size() and block_len <= m_blockSize, "Error in decompression"); } // Check if we have a single uncompressed block, or no blocks - if (max_dst_length < m_blockSize) { - if (srcLen < 3) { - // Total size is less than the 3-byte header - return nullptr; - } - *dstLen = srcLen - 3; - return srcBytes + 3; - } + if (max_dst_length < m_blockSize) { return src.subspan(header_size, src.size() - header_size); } + m_buf.resize(max_dst_length); - auto dst = m_buf.data(); size_t dst_length = 0; - for (size_t i = 0; i + 3 < srcLen;) { - uint32_t block_len = srcBytes[i] | (srcBytes[i + 1] << 8) | (srcBytes[i + 2] << 16); + for (size_t i = 0; i + header_size < src.size();) { + uint32_t block_len = src[i] | (src[i + 1] << 8) | (src[i + 2] << 16); uint32_t is_uncompressed = block_len & 1; - i += 3; + i += header_size; block_len >>= 1; if (is_uncompressed) { // Uncompressed block - memcpy(dst + dst_length, srcBytes + i, block_len); + memcpy(m_buf.data() + dst_length, src.data() + i, block_len); dst_length += block_len; } else { // Compressed block - dst_length += - decompress(_compression, {srcBytes + i, block_len}, {dst + dst_length, m_blockSize}); + dst_length += decompress( + _compression, src.subspan(i, block_len), {m_buf.data() + dst_length, m_blockSize}); } i += block_len; } - *dstLen = dst_length; - return m_buf.data(); + + m_buf.resize(dst_length); + return m_buf; } metadata::metadata(datasource* const src) : source(src) @@ -459,18 +445,16 @@ metadata::metadata(datasource* const src) : source(src) decompressor = std::make_unique(ps.compression, ps.compressionBlockSize); // Read compressed filefooter section - buffer = source->host_read(len - ps_length - 1 - ps.footerLength, ps.footerLength); - size_t ff_length = 0; - auto ff_data = decompressor->Decompress(buffer->data(), ps.footerLength, &ff_length); - ProtobufReader(ff_data, ff_length).read(ff); + buffer = source->host_read(len - ps_length - 1 - ps.footerLength, ps.footerLength); + auto const ff_data = decompressor->decompress_blocks({buffer->data(), buffer->size()}); + ProtobufReader(ff_data.data(), ff_data.size()).read(ff); CUDF_EXPECTS(get_num_columns() > 0, "No columns found"); // Read compressed metadata section buffer = source->host_read(len - ps_length - 1 - ps.footerLength - ps.metadataLength, ps.metadataLength); - size_t md_length = 0; - auto md_data = decompressor->Decompress(buffer->data(), ps.metadataLength, &md_length); - orc::ProtobufReader(md_data, md_length).read(md); + auto const md_data = decompressor->decompress_blocks({buffer->data(), buffer->size()}); + orc::ProtobufReader(md_data.data(), md_data.size()).read(md); init_parent_descriptors(); init_column_names(); diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 37a86960d57..6d97abb61c9 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -533,7 +533,15 @@ class ProtobufWriter { class OrcDecompressor { public: OrcDecompressor(CompressionKind kind, uint32_t blockSize); - const uint8_t* Decompress(const uint8_t* srcBytes, size_t srcLen, size_t* dstLen); + + /** + * @brief ORC block decompression + * + * @param src compressed data + * + * @return decompressed data + */ + host_span decompress_blocks(host_span src); [[nodiscard]] uint32_t GetLog2MaxCompressionRatio() const { return m_log2MaxRatio; } [[nodiscard]] uint32_t GetMaxUncompressedBlockSize(uint32_t block_len) const { From 2bdb3157b2490ce9e4c35ca91e7b1389c030f477 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 4 May 2022 23:18:18 -0700 Subject: [PATCH 11/17] reference --- cpp/src/io/orc/reader_impl.cu | 14 +++++++------- cpp/src/io/orc/reader_impl.hpp | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index 06ace09116b..af91fb6aa4e 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -286,7 +286,7 @@ void decompress_check(device_span stats, rmm::device_buffer reader::impl::decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, - const OrcDecompressor* decompressor, + OrcDecompressor const& decompressor, std::vector& stream_info, size_t num_stripes, cudf::detail::hostdevice_2dvector& row_groups, @@ -310,8 +310,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( gpu::ParseCompressedStripeData(compinfo.device_ptr(), compinfo.size(), - decompressor->GetBlockSize(), - decompressor->GetLog2MaxCompressionRatio(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), stream); compinfo.device_to_host(stream, true); @@ -357,8 +357,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( compinfo.host_to_device(stream); gpu::ParseCompressedStripeData(compinfo.device_ptr(), compinfo.size(), - decompressor->GetBlockSize(), - decompressor->GetLog2MaxCompressionRatio(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), stream); // Dispatch batches of blocks to decompress @@ -366,7 +366,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data( device_span> inflate_in_view{inflate_in.data(), num_compressed_blocks}; device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; - switch (decompressor->compression()) { + switch (decompressor.compression()) { case compression_type::ZLIB: gpuinflate( inflate_in_view, inflate_out_view, inflate_stats, gzip_header_included::NO, stream); @@ -1167,7 +1167,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, auto decomp_data = decompress_stripe_data(chunks, stripe_data, - _metadata.per_file_metadata[0].decompressor.get(), + *_metadata.per_file_metadata[0].decompressor, stream_info, total_num_stripes, row_groups, diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 103093f055f..9c87a7c5e12 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -107,7 +107,7 @@ class reader::impl { * * @param chunks Vector of list of column chunk descriptors * @param stripe_data List of source stripe column data - * @param decompressor Originally host decompressor + * @param decompressor Block decompressor * @param stream_info List of stream to column mappings * @param num_stripes Number of stripes making up column chunks * @param row_groups Vector of list of row index descriptors @@ -120,7 +120,7 @@ class reader::impl { rmm::device_buffer decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, - const OrcDecompressor* decompressor, + OrcDecompressor const& decompressor, std::vector& stream_info, size_t num_stripes, cudf::detail::hostdevice_2dvector& row_groups, From 71f5f4abd9184ab253da7abe0ed24f35d12d484b Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 5 May 2022 13:49:25 -0700 Subject: [PATCH 12/17] min --- cpp/src/io/orc/orc.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 6d97abb61c9..cd49e371a0b 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -545,8 +545,7 @@ class OrcDecompressor { [[nodiscard]] uint32_t GetLog2MaxCompressionRatio() const { return m_log2MaxRatio; } [[nodiscard]] uint32_t GetMaxUncompressedBlockSize(uint32_t block_len) const { - return (block_len < (m_blockSize >> m_log2MaxRatio)) ? block_len << m_log2MaxRatio - : m_blockSize; + return std::min(block_len << m_log2MaxRatio, m_blockSize); } [[nodiscard]] compression_type compression() const { return _compression; } [[nodiscard]] uint32_t GetBlockSize() const { return m_blockSize; } From c819249c85a19ce4dce36dc2fc4e86865b8cc4e0 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 5 May 2022 14:10:38 -0700 Subject: [PATCH 13/17] docs and style --- cpp/include/cudf/io/types.hpp | 8 ++++---- cpp/src/io/comp/io_uncomp.h | 7 ++----- cpp/src/io/orc/reader_impl.cu | 19 +++++++++---------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index f4be64aaee7..64312234360 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -61,10 +61,10 @@ enum class compression_type { BROTLI, ///< BROTLI format, using LZ77 + Huffman + 2nd order context modeling ZIP, ///< ZIP format, using DEFLATE algorithm XZ, ///< XZ format, using LZMA(2) algorithm - ZLIB, ///< ZIP format, using DEFLATE algorithm - LZ4, - LZO, - ZSTD + ZLIB, ///< ZLIB format, using DEFLATE algorithm + LZ4, ///< LZ4 format, using LZ77 + LZO, ///< Lempel–Ziv–Oberhumer format + ZSTD ///< Zstandard format }; /** diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 93672828f73..6f1c8a61e8a 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -29,13 +29,10 @@ namespace cudf { namespace io { /** - * @brief Decompresses a gzip/zip/bzip2/xz file stored in system memory. - * - * The result is allocated and stored in a vector. - * If the function call fails, the output vector is empty. + * @brief Decompresses a system memory buffer. * * @param compression Type of compression of the input data - * @param src Compressed buffer + * @param src Compressed host buffer * * @return Vector containing the Decompressed output */ diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index af91fb6aa4e..21f4d461cf9 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -1164,16 +1164,15 @@ table_with_metadata reader::impl::read(size_type skip_rows, } // Setup row group descriptors if using indexes if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) { - auto decomp_data = - decompress_stripe_data(chunks, - stripe_data, - *_metadata.per_file_metadata[0].decompressor, - stream_info, - total_num_stripes, - row_groups, - _metadata.get_row_index_stride(), - level == 0, - stream); + auto decomp_data = decompress_stripe_data(chunks, + stripe_data, + *_metadata.per_file_metadata[0].decompressor, + stream_info, + total_num_stripes, + row_groups, + _metadata.get_row_index_stride(), + level == 0, + stream); stripe_data.clear(); stripe_data.push_back(std::move(decomp_data)); } else { From feffa9b5359a090d76f5eae45b2a545e93805df5 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 12 May 2022 20:14:51 -0700 Subject: [PATCH 14/17] fallthrough --- cpp/src/io/comp/uncomp.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index ff5b2155a62..44055acb1f0 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -294,7 +294,8 @@ std::vector decompress(compression_type compression, host_span decompress(compression_type compression, host_span 4) { const bz2_file_header_s* fhdr = reinterpret_cast(raw); @@ -345,7 +347,8 @@ std::vector decompress(compression_type compression, host_span Date: Thu, 12 May 2022 20:40:16 -0700 Subject: [PATCH 15/17] code review --- cpp/src/io/json/reader_impl.cu | 2 +- cpp/src/io/orc/orc.cpp | 10 +++++----- cpp/src/io/orc/stripe_init.cu | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 9c2b1f02d4e..b965745c9cf 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -587,7 +587,7 @@ table_with_metadata read_json(std::vector>& sources, auto range_size = reader_opts.get_byte_range_size(); auto range_size_padded = reader_opts.get_byte_range_size_with_padding(); - auto h_raw_data = ingest_raw_input( + auto const h_raw_data = ingest_raw_input( sources, reader_opts.get_compression(), range_offset, range_size, range_size_padded); host_span h_data{reinterpret_cast(h_raw_data.data()), h_raw_data.size()}; diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index da7fc928681..7d0f96719e5 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -383,14 +383,14 @@ host_span OrcDecompressor::decompress_blocks(host_span= header_size, "Total size is less than the 3-byte header"); // First, scan the input for the number of blocks and worst-case output size size_t max_dst_length = 0; for (size_t i = 0; i + header_size < src.size();) { - uint32_t block_len = src[i] | (src[i + 1] << 8) | (src[i + 2] << 16); - uint32_t is_uncompressed = block_len & 1; + uint32_t block_len = src[i] | (src[i + 1] << 8) | (src[i + 2] << 16); + auto const is_uncompressed = static_cast(block_len & 1); i += header_size; block_len >>= 1; if (is_uncompressed) { @@ -408,8 +408,8 @@ host_span OrcDecompressor::decompress_blocks(host_span(block_len & 1); i += header_size; block_len >>= 1; if (is_uncompressed) { diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index e44ca10922f..fe5d74d4b4c 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -62,7 +62,7 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat uint32_t num_uncompressed_blocks = 0; while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); - uint32_t is_uncompressed = block_len & 1; + auto const is_uncompressed = static_cast(block_len & 1); uint32_t uncompressed_size; device_span* init_in_ctl = nullptr; device_span* init_out_ctl = nullptr; @@ -163,7 +163,7 @@ extern "C" __global__ void __launch_bounds__(128, 8) while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); - uint32_t is_uncompressed = block_len & 1; + auto const is_uncompressed = static_cast(block_len & 1); uint32_t uncompressed_size_est, uncompressed_size_actual; block_len >>= 1; cur += BLOCK_HEADER_SIZE; @@ -381,14 +381,14 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, auto decstatus = s->strm_info[ci_id].decstatus.data(); uint32_t uncomp_offset = 0; for (;;) { - uint32_t block_len, is_uncompressed; + uint32_t block_len; if (cur + BLOCK_HEADER_SIZE > end || cur + BLOCK_HEADER_SIZE >= start + compressed_offset) { break; } block_len = cur[0] | (cur[1] << 8) | (cur[2] << 16); cur += BLOCK_HEADER_SIZE; - is_uncompressed = block_len & 1; + auto const is_uncompressed = static_cast(block_len & 1); block_len >>= 1; cur += block_len; if (cur > end) { break; } From 4258c786731b9effb885d2db629e56a5c08bf8dc Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 12 May 2022 20:43:11 -0700 Subject: [PATCH 16/17] Apply suggestions from code review Co-authored-by: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> --- cpp/src/io/comp/uncomp.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index ff5b2155a62..34e32cf51ab 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -425,9 +425,8 @@ size_t decompress_snappy(host_span src, host_span dst) uint32_t l = 0, c; uncompressed_size = 0; do { - uint32_t lo7; c = *cur++; - lo7 = c & 0x7f; + uint32_t const lo7 = c & 0x7f; if (l >= 28 && c > 0xf) { return 0; } uncompressed_size |= lo7 << l; l += 7; @@ -472,7 +471,7 @@ size_t decompress_snappy(host_span src, host_span dst) // xxxxxx00: literal blen >>= 2; if (blen >= 60) { - uint32_t num_bytes = blen - 59; + uint32_t const num_bytes = blen - 59; if (cur + num_bytes >= end) break; blen = cur[0]; if (num_bytes > 1) { From 14952a1722ea12aaebd4d7056de3b84e7598bf20 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 12 May 2022 20:47:40 -0700 Subject: [PATCH 17/17] style --- cpp/src/io/comp/uncomp.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 4e7d51f355e..ebf7bfafb14 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -428,8 +428,8 @@ size_t decompress_snappy(host_span src, host_span dst) uint32_t l = 0, c; uncompressed_size = 0; do { - c = *cur++; - uint32_t const lo7 = c & 0x7f; + c = *cur++; + auto const lo7 = c & 0x7f; if (l >= 28 && c > 0xf) { return 0; } uncompressed_size |= lo7 << l; l += 7;