diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 9d6a83e8730..bd1e3be838b 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -60,7 +60,11 @@ enum class compression_type { BZIP2, ///< BZIP2 format, using Burrows-Wheeler transform BROTLI, ///< BROTLI format, using LZ77 + Huffman + 2nd order context modeling ZIP, ///< ZIP format, using DEFLATE algorithm - XZ ///< XZ format, using LZMA(2) algorithm + XZ, ///< XZ format, using LZMA(2) algorithm + ZLIB, ///< ZLIB format, using DEFLATE algorithm + LZ4, ///< LZ4 format, using LZ77 + LZO, ///< Lempel–Ziv–Oberhumer format + ZSTD ///< Zstandard format }; /** diff --git a/cpp/src/io/comp/io_uncomp.h b/cpp/src/io/comp/io_uncomp.h index 7b1feb84813..6f1c8a61e8a 100644 --- a/cpp/src/io/comp/io_uncomp.h +++ b/cpp/src/io/comp/io_uncomp.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,35 +27,20 @@ using cudf::host_span; namespace cudf { namespace io { -enum { - IO_UNCOMP_STREAM_TYPE_INFER = 0, - IO_UNCOMP_STREAM_TYPE_GZIP = 1, - IO_UNCOMP_STREAM_TYPE_ZIP = 2, - IO_UNCOMP_STREAM_TYPE_BZIP2 = 3, - IO_UNCOMP_STREAM_TYPE_XZ = 4, - IO_UNCOMP_STREAM_TYPE_INFLATE = 5, - IO_UNCOMP_STREAM_TYPE_SNAPPY = 6, - IO_UNCOMP_STREAM_TYPE_BROTLI = 7, - IO_UNCOMP_STREAM_TYPE_LZ4 = 8, - IO_UNCOMP_STREAM_TYPE_LZO = 9, - IO_UNCOMP_STREAM_TYPE_ZSTD = 10, -}; -std::vector io_uncompress_single_h2d(void const* src, size_t src_size, int stream_type); - -std::vector get_uncompressed_data(host_span data, compression_type compression); - -class HostDecompressor { - public: - virtual size_t Decompress(uint8_t* dstBytes, - size_t dstLen, - uint8_t const* srcBytes, - size_t srcLen) = 0; - virtual ~HostDecompressor() {} +/** + * @brief Decompresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * + * @return Vector containing the Decompressed output + */ +std::vector decompress(compression_type compression, host_span src); - public: - static std::unique_ptr Create(int stream_type); -}; +size_t decompress(compression_type compression, + host_span src, + host_span dst); /** * @brief GZIP header flags diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index b2e6f07b80b..0fde4e1a5c4 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -23,9 +23,9 @@ namespace cudf::io::nvcomp { template -auto batched_decompress_get_temp_size(compression_type type, Args&&... args) +auto batched_decompress_get_temp_size(compression_type compression, Args&&... args) { - switch (type) { + switch (compression) { case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressGetTempSize(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); @@ -33,27 +33,27 @@ auto batched_decompress_get_temp_size(compression_type type, Args&&... args) }; template -auto batched_decompress_async(compression_type type, Args&&... args) +auto batched_decompress_async(compression_type compression, Args&&... args) { - switch (type) { + switch (compression) { case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressAsync(std::forward(args)...); default: CUDF_FAIL("Unsupported compression type"); } }; -size_t get_temp_size(compression_type type, size_t num_chunks, size_t max_uncomp_chunk_size) +size_t get_temp_size(compression_type compression, size_t num_chunks, size_t max_uncomp_chunk_size) { size_t temp_size = 0; nvcompStatus_t nvcomp_status = - batched_decompress_get_temp_size(type, num_chunks, max_uncomp_chunk_size, &temp_size); + batched_decompress_get_temp_size(compression, num_chunks, max_uncomp_chunk_size, &temp_size); CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Unable to get scratch size for decompression"); return temp_size; } -void batched_decompress(compression_type type, +void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, device_span statuses, @@ -67,8 +67,8 @@ void batched_decompress(compression_type type, rmm::device_uvector actual_uncompressed_data_sizes(num_chunks, stream); rmm::device_uvector nvcomp_statuses(num_chunks, stream); // Temporary space required for decompression - rmm::device_buffer scratch(get_temp_size(type, num_chunks, max_uncomp_chunk_size), stream); - auto const nvcomp_status = batched_decompress_async(type, + rmm::device_buffer scratch(get_temp_size(compression, num_chunks, max_uncomp_chunk_size), stream); + auto const nvcomp_status = batched_decompress_async(compression, nvcomp_args.compressed_data_ptrs.data(), nvcomp_args.compressed_data_sizes.data(), nvcomp_args.uncompressed_data_sizes.data(), diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index a0eb6bc4fbf..c289e2d2ade 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -36,7 +36,7 @@ enum class compression_type { SNAPPY }; * @param[in] max_uncomp_page_size maximum size of uncompressed block * @param[in] stream CUDA stream to use */ -void batched_decompress(compression_type type, +void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, device_span statuses, diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 66d73074af0..ebf7bfafb14 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2021, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -249,23 +249,17 @@ int cpu_inflate(uint8_t* uncomp_data, size_t* destLen, const uint8_t* comp_data, * @param[in] comp_data Raw compressed data * @param[in] comp_len Compressed data size */ -int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t comp_len) +void cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t comp_len) { - int zerr; - z_stream strm; - - memset(&strm, 0, sizeof(strm)); + z_stream strm{}; strm.next_in = const_cast(reinterpret_cast(comp_data)); strm.avail_in = comp_len; strm.total_in = 0; - strm.next_out = reinterpret_cast(dst.data()); + strm.next_out = dst.data(); strm.avail_out = dst.size(); strm.total_out = 0; - zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers - if (zerr != 0) { - dst.resize(0); - return zerr; - } + auto zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers + CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream"); do { if (strm.avail_out == 0) { dst.resize(strm.total_out + (1 << 30)); @@ -277,46 +271,35 @@ int cpu_inflate_vector(std::vector& dst, const uint8_t* comp_data, size_t strm.total_out == dst.size()); dst.resize(strm.total_out); inflateEnd(&strm); - return (zerr == Z_STREAM_END) ? Z_OK : zerr; + CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); } -/** - * @brief Uncompresses a gzip/zip/bzip2/xz file stored in system memory. - * - * The result is allocated and stored in a vector. - * If the function call fails, the output vector is empty. - * - * @param[in] src Pointer to the compressed data in system memory - * @param[in] src_size The size of the compressed data, in bytes - * @param[in] stream_type Type of compression of the input data - * - * @return Vector containing the uncompressed output - */ -std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int stream_type) +std::vector decompress(compression_type compression, host_span src) { - const uint8_t* raw = static_cast(src); + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto raw = src.data(); const uint8_t* comp_data = nullptr; size_t comp_len = 0; size_t uncomp_len = 0; - CUDF_EXPECTS(src != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(src_size != 0, "Decompression: Source size cannot be 0"); - - switch (stream_type) { - case IO_UNCOMP_STREAM_TYPE_INFER: - case IO_UNCOMP_STREAM_TYPE_GZIP: { + switch (compression) { + case compression_type::AUTO: + case compression_type::GZIP: { gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src_size)) { - stream_type = IO_UNCOMP_STREAM_TYPE_GZIP; + if (ParseGZArchive(&gz, raw, src.size())) { + compression = compression_type::GZIP; comp_data = gz.comp_data; comp_len = gz.comp_len; uncomp_len = gz.isize; } - if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER + if (compression != compression_type::AUTO) break; + [[fallthrough]]; } - case IO_UNCOMP_STREAM_TYPE_ZIP: { + case compression_type::ZIP: { zip_archive_s za; - if (OpenZipArchive(&za, raw, src_size)) { + if (OpenZipArchive(&za, raw, src.size())) { size_t cdfh_ofs = 0; for (int i = 0; i < za.eocd->num_entries; i++) { const zip_cdfh_s* cdfh = reinterpret_cast( @@ -330,14 +313,14 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { size_t lfh_ofs = cdfh->hdr_ofs; const zip_lfh_s* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src_size && lfh->sig == 0x04034b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src_size) { + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x04034b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; size_t file_end = file_start + lfh->comp_size; - if (file_end <= src_size) { + if (file_end <= src.size()) { // Pick the first valid file of non-zero size (only 1 file expected in archive) - stream_type = IO_UNCOMP_STREAM_TYPE_ZIP; + compression = compression_type::ZIP; comp_data = raw + file_start; comp_len = lfh->comp_size; uncomp_len = lfh->uncomp_size; @@ -350,49 +333,46 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int } } } - if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER - case IO_UNCOMP_STREAM_TYPE_BZIP2: - if (src_size > 4) { + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + case compression_type::BZIP2: + if (src.size() > 4) { const bz2_file_header_s* fhdr = reinterpret_cast(raw); // Check for BZIP2 file signature "BZh1" to "BZh9" if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && fhdr->blksz >= '1' && fhdr->blksz <= '9') { - stream_type = IO_UNCOMP_STREAM_TYPE_BZIP2; + compression = compression_type::BZIP2; comp_data = raw; - comp_len = src_size; + comp_len = src.size(); uncomp_len = 0; } } - if (stream_type != IO_UNCOMP_STREAM_TYPE_INFER) break; // Fall through for INFER - default: - // Unsupported format - break; + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + default: CUDF_FAIL("Unsupported compressed stream type"); } - CUDF_EXPECTS(comp_data != nullptr, "Unsupported compressed stream type"); - CUDF_EXPECTS(comp_len > 0, "Unsupported compressed stream type"); + CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); if (uncomp_len <= 0) { uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume // ~4:1 compression for initial size } - if (stream_type == IO_UNCOMP_STREAM_TYPE_GZIP || stream_type == IO_UNCOMP_STREAM_TYPE_ZIP) { + if (compression == compression_type::GZIP || compression == compression_type::ZIP) { // INFLATE - std::vector dst(uncomp_len); - CUDF_EXPECTS(cpu_inflate_vector(dst, comp_data, comp_len) == 0, - "Decompression: error in stream"); + std::vector dst(uncomp_len); + cpu_inflate_vector(dst, comp_data, comp_len); return dst; } - if (stream_type == IO_UNCOMP_STREAM_TYPE_BZIP2) { + if (compression == compression_type::BZIP2) { size_t src_ofs = 0; size_t dst_ofs = 0; int bz_err = 0; - std::vector dst(uncomp_len); + std::vector dst(uncomp_len); do { size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress( - comp_data, comp_len, reinterpret_cast(dst.data()) + dst_ofs, &dst_len, &src_ofs); + bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); if (bz_err == BZ_OUTBUFF_FULL) { // TBD: We could infer the compression ratio based on produced/consumed byte counts // in order to minimize realloc events and over-allocation @@ -413,167 +393,121 @@ std::vector io_uncompress_single_h2d(const void* src, size_t src_size, int } /** - * @brief Uncompresses the input data and stores the allocated result into - * a vector. - * - * @param[in] data Pointer to the csv data in host memory - * @param[in] compression String describing the compression type - * - * @return Vector containing the output uncompressed data + * @brief ZLIB host decompressor (no header) */ -std::vector get_uncompressed_data(host_span const data, - compression_type compression) +size_t decompress_zlib(host_span src, host_span dst) { - auto const comp_type = [compression]() { - switch (compression) { - case compression_type::GZIP: return IO_UNCOMP_STREAM_TYPE_GZIP; - case compression_type::ZIP: return IO_UNCOMP_STREAM_TYPE_ZIP; - case compression_type::BZIP2: return IO_UNCOMP_STREAM_TYPE_BZIP2; - case compression_type::XZ: return IO_UNCOMP_STREAM_TYPE_XZ; - default: return IO_UNCOMP_STREAM_TYPE_INFER; - } - }(); - - return io_uncompress_single_h2d(data.data(), data.size(), comp_type); + size_t uncomp_size = dst.size(); + CUDF_EXPECTS(0 == cpu_inflate(dst.data(), &uncomp_size, src.data(), src.size()), + "ZLIB decompression failed"); + return uncomp_size; } /** - * @brief ZLIB host decompressor class + * @brief GZIP host decompressor (includes header) */ -class HostDecompressor_ZLIB : public HostDecompressor { - public: - HostDecompressor_ZLIB(bool gz_hdr_) : gz_hdr(gz_hdr_) {} - size_t Decompress(uint8_t* dstBytes, - size_t dstLen, - const uint8_t* srcBytes, - size_t srcLen) override - { - if (gz_hdr) { - gz_archive_s gz; - if (!ParseGZArchive(&gz, srcBytes, srcLen)) { return 0; } - srcBytes = gz.comp_data; - srcLen = gz.comp_len; - } - if (0 == cpu_inflate(dstBytes, &dstLen, srcBytes, srcLen)) { - return dstLen; - } else { - return 0; - } - } - - protected: - const bool gz_hdr; -}; +size_t decompress_gzip(host_span src, host_span dst) +{ + gz_archive_s gz; + auto const parse_succeeded = ParseGZArchive(&gz, src.data(), src.size()); + CUDF_EXPECTS(parse_succeeded, "Failed to parse GZIP header"); + return decompress_zlib({gz.comp_data, gz.comp_len}, dst); +} /** - * @brief SNAPPY host decompressor class + * @brief SNAPPY host decompressor */ -class HostDecompressor_SNAPPY : public HostDecompressor { - public: - HostDecompressor_SNAPPY() {} - size_t Decompress(uint8_t* dstBytes, - size_t dstLen, - const uint8_t* srcBytes, - size_t srcLen) override +size_t decompress_snappy(host_span src, host_span dst) +{ + CUDF_EXPECTS(not dst.empty() and src.size() >= 1, "invalid Snappy decompress inputs"); + uint32_t uncompressed_size, bytes_left, dst_pos; + auto cur = src.begin(); + auto const end = src.end(); + // Read uncompressed length (varint) { - uint32_t uncompressed_size, bytes_left, dst_pos; - const uint8_t* cur = srcBytes; - const uint8_t* end = srcBytes + srcLen; - - if (!dstBytes || srcLen < 1) { return 0; } - // Read uncompressed length (varint) - { - uint32_t l = 0, c; - uncompressed_size = 0; - do { - uint32_t lo7; - c = *cur++; - lo7 = c & 0x7f; - if (l >= 28 && c > 0xf) { return 0; } - uncompressed_size |= lo7 << l; - l += 7; - } while (c > 0x7f && cur < end); - if (!uncompressed_size || uncompressed_size > dstLen || cur >= end) { - // Destination buffer too small or zero size - return 0; - } - } - // Decode lz77 - dst_pos = 0; - bytes_left = uncompressed_size; + uint32_t l = 0, c; + uncompressed_size = 0; do { - uint32_t blen = *cur++; - - if (blen & 3) { - // Copy - uint32_t offset; - if (blen & 2) { - // xxxxxx1x: copy with 6-bit length, 2-byte or 4-byte offset + c = *cur++; + auto const lo7 = c & 0x7f; + if (l >= 28 && c > 0xf) { return 0; } + uncompressed_size |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + CUDF_EXPECTS(uncompressed_size != 0 and uncompressed_size <= dst.size() and cur < end, + "Destination buffer too small"); + } + // Decode lz77 + dst_pos = 0; + bytes_left = uncompressed_size; + do { + uint32_t blen = *cur++; + + if (blen & 3) { + // Copy + uint32_t offset; + if (blen & 2) { + // xxxxxx1x: copy with 6-bit length, 2-byte or 4-byte offset + if (cur + 2 > end) break; + offset = *reinterpret_cast(cur); + cur += 2; + if (blen & 1) // 4-byte offset + { if (cur + 2 > end) break; - offset = *reinterpret_cast(cur); + offset |= (*reinterpret_cast(cur)) << 16; cur += 2; - if (blen & 1) // 4-byte offset - { - if (cur + 2 > end) break; - offset |= (*reinterpret_cast(cur)) << 16; - cur += 2; - } - blen = (blen >> 2) + 1; - } else { - // xxxxxx01.oooooooo: copy with 3-bit length, 11-bit offset - if (cur >= end) break; - offset = ((blen & 0xe0) << 3) | (*cur++); - blen = ((blen >> 2) & 7) + 4; } - if (offset - 1u >= dst_pos || blen > bytes_left) break; - bytes_left -= blen; - do { - dstBytes[dst_pos] = dstBytes[dst_pos - offset]; - dst_pos++; - } while (--blen); + blen = (blen >> 2) + 1; } else { - // xxxxxx00: literal - blen >>= 2; - if (blen >= 60) { - uint32_t num_bytes = blen - 59; - if (cur + num_bytes >= end) break; - blen = cur[0]; - if (num_bytes > 1) { - blen |= cur[1] << 8; - if (num_bytes > 2) { - blen |= cur[2] << 16; - if (num_bytes > 3) { blen |= cur[3] << 24; } - } + // xxxxxx01.oooooooo: copy with 3-bit length, 11-bit offset + if (cur >= end) break; + offset = ((blen & 0xe0) << 3) | (*cur++); + blen = ((blen >> 2) & 7) + 4; + } + if (offset - 1u >= dst_pos || blen > bytes_left) break; + bytes_left -= blen; + do { + dst[dst_pos] = dst[dst_pos - offset]; + dst_pos++; + } while (--blen); + } else { + // xxxxxx00: literal + blen >>= 2; + if (blen >= 60) { + uint32_t const num_bytes = blen - 59; + if (cur + num_bytes >= end) break; + blen = cur[0]; + if (num_bytes > 1) { + blen |= cur[1] << 8; + if (num_bytes > 2) { + blen |= cur[2] << 16; + if (num_bytes > 3) { blen |= cur[3] << 24; } } - cur += num_bytes; } - blen++; - if (cur + blen > end || blen > bytes_left) break; - memcpy(dstBytes + dst_pos, cur, blen); - cur += blen; - dst_pos += blen; - bytes_left -= blen; + cur += num_bytes; } - } while (bytes_left && cur < end); - return (bytes_left) ? 0 : uncompressed_size; - } -}; + blen++; + if (cur + blen > end || blen > bytes_left) break; + memcpy(dst.data() + dst_pos, cur, blen); + cur += blen; + dst_pos += blen; + bytes_left -= blen; + } + } while (bytes_left && cur < end); + CUDF_EXPECTS(bytes_left == 0, "Snappy Decompression failed"); + return uncompressed_size; +} -/** - * @brief CPU decompression class - * - * @param[in] stream_type compression method (IO_UNCOMP_STREAM_TYPE_XXX) - * - * @returns corresponding HostDecompressor class, nullptr if failure - */ -std::unique_ptr HostDecompressor::Create(int stream_type) +size_t decompress(compression_type compression, + host_span src, + host_span dst) { - switch (stream_type) { - case IO_UNCOMP_STREAM_TYPE_GZIP: return std::make_unique(true); - case IO_UNCOMP_STREAM_TYPE_INFLATE: return std::make_unique(false); - case IO_UNCOMP_STREAM_TYPE_SNAPPY: return std::make_unique(); + switch (compression) { + case compression_type::GZIP: return decompress_gzip(src, dst); + case compression_type::ZLIB: return decompress_zlib(src, dst); + case compression_type::SNAPPY: return decompress_snappy(src, dst); + default: CUDF_FAIL("Unsupported compression type"); } - CUDF_FAIL("Unsupported compression type"); } } // namespace io diff --git a/cpp/src/io/csv/reader_impl.cu b/cpp/src/io/csv/reader_impl.cu index d20155b4720..fce9b008374 100644 --- a/cpp/src/io/csv/reader_impl.cu +++ b/cpp/src/io/csv/reader_impl.cu @@ -420,11 +420,13 @@ std::pair, selected_rows_offsets> select_data_and_row_ reinterpret_cast(buffer->data()), buffer->size()); - std::vector h_uncomp_data_owner; + std::vector h_uncomp_data_owner; if (reader_opts.get_compression() != compression_type::NONE) { - h_uncomp_data_owner = get_uncompressed_data(h_data, reader_opts.get_compression()); - h_data = h_uncomp_data_owner; + h_uncomp_data_owner = + decompress(reader_opts.get_compression(), {buffer->data(), buffer->size()}); + h_data = {reinterpret_cast(h_uncomp_data_owner.data()), + h_uncomp_data_owner.size()}; } // None of the parameters for row selection is used, we are parsing the entire file const bool load_whole_file = range_offset == 0 && range_size == 0 && skip_rows <= 0 && diff --git a/cpp/src/io/json/reader_impl.cu b/cpp/src/io/json/reader_impl.cu index 20eeec267b1..b965745c9cf 100644 --- a/cpp/src/io/json/reader_impl.cu +++ b/cpp/src/io/json/reader_impl.cu @@ -214,11 +214,11 @@ std::pair, col_map_ptr_type> get_json_object_keys_hashe create_col_names_hash_map(sorted_info->get_column(2).view(), stream)}; } -std::vector ingest_raw_input(std::vector> const& sources, - compression_type compression, - size_t range_offset, - size_t range_size, - size_t range_size_padded) +std::vector ingest_raw_input(std::vector> const& sources, + compression_type compression, + size_t range_offset, + size_t range_size, + size_t range_size_padded) { // Iterate through the user defined sources and read the contents into the local buffer size_t total_source_size = 0; @@ -227,13 +227,13 @@ std::vector ingest_raw_input(std::vector> cons } total_source_size = total_source_size - (range_offset * sources.size()); - auto buffer = std::vector(total_source_size); + auto buffer = std::vector(total_source_size); size_t bytes_read = 0; for (const auto& source : sources) { if (!source->is_empty()) { auto data_size = (range_size_padded != 0) ? range_size_padded : source->size(); - auto destination = reinterpret_cast(buffer.data()) + bytes_read; + auto destination = buffer.data() + bytes_read; bytes_read += source->host_read(range_offset, data_size, destination); } } @@ -241,7 +241,7 @@ std::vector ingest_raw_input(std::vector> cons if (compression == compression_type::NONE) { return buffer; } else { - return get_uncompressed_data(buffer, compression); + return decompress(compression, buffer); } } @@ -587,8 +587,9 @@ table_with_metadata read_json(std::vector>& sources, auto range_size = reader_opts.get_byte_range_size(); auto range_size_padded = reader_opts.get_byte_range_size_with_padding(); - auto h_data = ingest_raw_input( + auto const h_raw_data = ingest_raw_input( sources, reader_opts.get_compression(), range_offset, range_size, range_size_padded); + host_span h_data{reinterpret_cast(h_raw_data.data()), h_raw_data.size()}; CUDF_EXPECTS(h_data.size() != 0, "Ingest failed: uncompressed input data has zero size.\n"); diff --git a/cpp/src/io/orc/aggregate_orc_metadata.cpp b/cpp/src/io/orc/aggregate_orc_metadata.cpp index 47244279599..6bbc033a9ba 100644 --- a/cpp/src/io/orc/aggregate_orc_metadata.cpp +++ b/cpp/src/io/orc/aggregate_orc_metadata.cpp @@ -233,10 +233,9 @@ std::vector aggregate_orc_metadata::select_stri "Invalid stripe information"); const auto buffer = per_file_metadata[mapping.source_idx].source->host_read(sf_comp_offset, sf_comp_length); - size_t sf_length = 0; - auto sf_data = per_file_metadata[mapping.source_idx].decompressor->Decompress( - buffer->data(), sf_comp_length, &sf_length); - ProtobufReader(sf_data, sf_length) + auto sf_data = per_file_metadata[mapping.source_idx].decompressor->decompress_blocks( + {buffer->data(), buffer->size()}); + ProtobufReader(sf_data.data(), sf_data.size()) .read(per_file_metadata[mapping.source_idx].stripefooters[i]); mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i]; if (stripe->indexLength == 0) { row_grp_idx_present = false; } diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index f51fd28676e..7d0f96719e5 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -356,53 +356,42 @@ size_t ProtobufWriter::write(const Metadata& s) return w.value(); } -OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) - : m_kind(kind), m_blockSize(blockSize) -{ - if (kind != NONE) { - int stream_type = IO_UNCOMP_STREAM_TYPE_INFER; // Will be treated as invalid - switch (kind) { - case NONE: break; - case ZLIB: - stream_type = IO_UNCOMP_STREAM_TYPE_INFLATE; - m_log2MaxRatio = 11; // < 2048:1 - break; - case SNAPPY: - stream_type = IO_UNCOMP_STREAM_TYPE_SNAPPY; - m_log2MaxRatio = 5; // < 32:1 - break; - case LZO: stream_type = IO_UNCOMP_STREAM_TYPE_LZO; break; - case LZ4: stream_type = IO_UNCOMP_STREAM_TYPE_LZ4; break; - case ZSTD: stream_type = IO_UNCOMP_STREAM_TYPE_ZSTD; break; - } - m_decompressor = HostDecompressor::Create(stream_type); - } else { - m_log2MaxRatio = 0; +OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) : m_blockSize(blockSize) +{ + switch (kind) { + case NONE: + _compression = compression_type::NONE; + m_log2MaxRatio = 0; + break; + case ZLIB: + _compression = compression_type::ZLIB; + m_log2MaxRatio = 11; // < 2048:1 + break; + case SNAPPY: + _compression = compression_type::SNAPPY; + m_log2MaxRatio = 5; // < 32:1 + break; + case LZO: _compression = compression_type::LZO; break; + case LZ4: _compression = compression_type::LZ4; break; + case ZSTD: _compression = compression_type::ZSTD; break; + default: CUDF_FAIL("Invalid compression type"); } } -/** - * @brief ORC block decompression - * - * @param[in] srcBytes compressed data - * @param[in] srcLen length of compressed data - * @param[out] dstLen length of uncompressed data - * - * @returns pointer to uncompressed data, nullptr if error - */ -const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLen, size_t* dstLen) +host_span OrcDecompressor::decompress_blocks(host_span src) { // If uncompressed, just pass-through the input - if (m_kind == NONE) { - *dstLen = srcLen; - return srcBytes; - } + if (src.empty() or _compression == compression_type::NONE) { return src; } + + constexpr size_t header_size = 3; + CUDF_EXPECTS(src.size() >= header_size, "Total size is less than the 3-byte header"); + // First, scan the input for the number of blocks and worst-case output size size_t max_dst_length = 0; - for (size_t i = 0; i + 3 < srcLen;) { - uint32_t block_len = srcBytes[i] | (srcBytes[i + 1] << 8) | (srcBytes[i + 2] << 16); - uint32_t is_uncompressed = block_len & 1; - i += 3; + for (size_t i = 0; i + header_size < src.size();) { + uint32_t block_len = src[i] | (src[i + 1] << 8) | (src[i + 2] << 16); + auto const is_uncompressed = static_cast(block_len & 1); + i += header_size; block_len >>= 1; if (is_uncompressed) { // Uncompressed block @@ -411,38 +400,32 @@ const uint8_t* OrcDecompressor::Decompress(const uint8_t* srcBytes, size_t srcLe max_dst_length += m_blockSize; } i += block_len; - if (i > srcLen || block_len > m_blockSize) { return nullptr; } + CUDF_EXPECTS(i <= src.size() and block_len <= m_blockSize, "Error in decompression"); } // Check if we have a single uncompressed block, or no blocks - if (max_dst_length < m_blockSize) { - if (srcLen < 3) { - // Total size is less than the 3-byte header - return nullptr; - } - *dstLen = srcLen - 3; - return srcBytes + 3; - } + if (max_dst_length < m_blockSize) { return src.subspan(header_size, src.size() - header_size); } + m_buf.resize(max_dst_length); - auto dst = m_buf.data(); size_t dst_length = 0; - for (size_t i = 0; i + 3 < srcLen;) { - uint32_t block_len = srcBytes[i] | (srcBytes[i + 1] << 8) | (srcBytes[i + 2] << 16); - uint32_t is_uncompressed = block_len & 1; - i += 3; + for (size_t i = 0; i + header_size < src.size();) { + uint32_t block_len = src[i] | (src[i + 1] << 8) | (src[i + 2] << 16); + auto const is_uncompressed = static_cast(block_len & 1); + i += header_size; block_len >>= 1; if (is_uncompressed) { // Uncompressed block - memcpy(dst + dst_length, srcBytes + i, block_len); + memcpy(m_buf.data() + dst_length, src.data() + i, block_len); dst_length += block_len; } else { // Compressed block - dst_length += - m_decompressor->Decompress(dst + dst_length, m_blockSize, srcBytes + i, block_len); + dst_length += decompress( + _compression, src.subspan(i, block_len), {m_buf.data() + dst_length, m_blockSize}); } i += block_len; } - *dstLen = dst_length; - return m_buf.data(); + + m_buf.resize(dst_length); + return m_buf; } metadata::metadata(datasource* const src) : source(src) @@ -462,18 +445,16 @@ metadata::metadata(datasource* const src) : source(src) decompressor = std::make_unique(ps.compression, ps.compressionBlockSize); // Read compressed filefooter section - buffer = source->host_read(len - ps_length - 1 - ps.footerLength, ps.footerLength); - size_t ff_length = 0; - auto ff_data = decompressor->Decompress(buffer->data(), ps.footerLength, &ff_length); - ProtobufReader(ff_data, ff_length).read(ff); + buffer = source->host_read(len - ps_length - 1 - ps.footerLength, ps.footerLength); + auto const ff_data = decompressor->decompress_blocks({buffer->data(), buffer->size()}); + ProtobufReader(ff_data.data(), ff_data.size()).read(ff); CUDF_EXPECTS(get_num_columns() > 0, "No columns found"); // Read compressed metadata section buffer = source->host_read(len - ps_length - 1 - ps.footerLength - ps.metadataLength, ps.metadataLength); - size_t md_length = 0; - auto md_data = decompressor->Decompress(buffer->data(), ps.metadataLength, &md_length); - orc::ProtobufReader(md_data, md_length).read(md); + auto const md_data = decompressor->decompress_blocks({buffer->data(), buffer->size()}); + orc::ProtobufReader(md_data.data(), md_data.size()).read(md); init_parent_descriptors(); init_column_names(); diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 73eb8b382db..cd49e371a0b 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -533,21 +533,27 @@ class ProtobufWriter { class OrcDecompressor { public: OrcDecompressor(CompressionKind kind, uint32_t blockSize); - const uint8_t* Decompress(const uint8_t* srcBytes, size_t srcLen, size_t* dstLen); + + /** + * @brief ORC block decompression + * + * @param src compressed data + * + * @return decompressed data + */ + host_span decompress_blocks(host_span src); [[nodiscard]] uint32_t GetLog2MaxCompressionRatio() const { return m_log2MaxRatio; } [[nodiscard]] uint32_t GetMaxUncompressedBlockSize(uint32_t block_len) const { - return (block_len < (m_blockSize >> m_log2MaxRatio)) ? block_len << m_log2MaxRatio - : m_blockSize; + return std::min(block_len << m_log2MaxRatio, m_blockSize); } - [[nodiscard]] CompressionKind GetKind() const { return m_kind; } + [[nodiscard]] compression_type compression() const { return _compression; } [[nodiscard]] uint32_t GetBlockSize() const { return m_blockSize; } protected: - CompressionKind const m_kind; + compression_type _compression; uint32_t m_log2MaxRatio = 24; // log2 of maximum compression ratio - uint32_t const m_blockSize; - std::unique_ptr m_decompressor; + uint32_t m_blockSize; std::vector m_buf; }; diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index f64ba6f0566..383a6af78d8 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -286,7 +286,7 @@ void decompress_check(device_span stats, rmm::device_buffer reader::impl::decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, - const OrcDecompressor* decompressor, + OrcDecompressor const& decompressor, std::vector& stream_info, size_t num_stripes, cudf::detail::hostdevice_2dvector& row_groups, @@ -310,8 +310,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( gpu::ParseCompressedStripeData(compinfo.device_ptr(), compinfo.size(), - decompressor->GetBlockSize(), - decompressor->GetLog2MaxCompressionRatio(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), stream); compinfo.device_to_host(stream, true); @@ -357,8 +357,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( compinfo.host_to_device(stream); gpu::ParseCompressedStripeData(compinfo.device_ptr(), compinfo.size(), - decompressor->GetBlockSize(), - decompressor->GetLog2MaxCompressionRatio(), + decompressor.GetBlockSize(), + decompressor.GetLog2MaxCompressionRatio(), stream); // Dispatch batches of blocks to decompress @@ -366,12 +366,12 @@ rmm::device_buffer reader::impl::decompress_stripe_data( device_span> inflate_in_view{inflate_in.data(), num_compressed_blocks}; device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; - switch (decompressor->GetKind()) { - case orc::ZLIB: + switch (decompressor.compression()) { + case compression_type::ZLIB: gpuinflate( inflate_in_view, inflate_out_view, inflate_stats, gzip_header_included::NO, stream); break; - case orc::SNAPPY: + case compression_type::SNAPPY: if (nvcomp_integration::is_stable_enabled()) { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, inflate_in_view, @@ -1164,16 +1164,15 @@ table_with_metadata reader::impl::read(size_type skip_rows, } // Setup row group descriptors if using indexes if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) { - auto decomp_data = - decompress_stripe_data(chunks, - stripe_data, - _metadata.per_file_metadata[0].decompressor.get(), - stream_info, - total_num_stripes, - row_groups, - _metadata.get_row_index_stride(), - level == 0, - stream); + auto decomp_data = decompress_stripe_data(chunks, + stripe_data, + *_metadata.per_file_metadata[0].decompressor, + stream_info, + total_num_stripes, + row_groups, + _metadata.get_row_index_stride(), + level == 0, + stream); stripe_data.clear(); stripe_data.push_back(std::move(decomp_data)); } else { diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 103093f055f..9c87a7c5e12 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -107,7 +107,7 @@ class reader::impl { * * @param chunks Vector of list of column chunk descriptors * @param stripe_data List of source stripe column data - * @param decompressor Originally host decompressor + * @param decompressor Block decompressor * @param stream_info List of stream to column mappings * @param num_stripes Number of stripes making up column chunks * @param row_groups Vector of list of row index descriptors @@ -120,7 +120,7 @@ class reader::impl { rmm::device_buffer decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, - const OrcDecompressor* decompressor, + OrcDecompressor const& decompressor, std::vector& stream_info, size_t num_stripes, cudf::detail::hostdevice_2dvector& row_groups, diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index e44ca10922f..fe5d74d4b4c 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -62,7 +62,7 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat uint32_t num_uncompressed_blocks = 0; while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); - uint32_t is_uncompressed = block_len & 1; + auto const is_uncompressed = static_cast(block_len & 1); uint32_t uncompressed_size; device_span* init_in_ctl = nullptr; device_span* init_out_ctl = nullptr; @@ -163,7 +163,7 @@ extern "C" __global__ void __launch_bounds__(128, 8) while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); - uint32_t is_uncompressed = block_len & 1; + auto const is_uncompressed = static_cast(block_len & 1); uint32_t uncompressed_size_est, uncompressed_size_actual; block_len >>= 1; cur += BLOCK_HEADER_SIZE; @@ -381,14 +381,14 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, auto decstatus = s->strm_info[ci_id].decstatus.data(); uint32_t uncomp_offset = 0; for (;;) { - uint32_t block_len, is_uncompressed; + uint32_t block_len; if (cur + BLOCK_HEADER_SIZE > end || cur + BLOCK_HEADER_SIZE >= start + compressed_offset) { break; } block_len = cur[0] | (cur[1] << 8) | (cur[2] << 16); cur += BLOCK_HEADER_SIZE; - is_uncompressed = block_len & 1; + auto const is_uncompressed = static_cast(block_len & 1); block_len >>= 1; cur += block_len; if (cur > end) { break; }