From 03ac8455557838b57255d619bb792da0149e53c8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 18 Nov 2024 12:39:05 -0500 Subject: [PATCH] Reading multi-source compressed JSONL files (#17161) Fixes #17068 Fixes #12299 This PR introduces a new datasource for compressed inputs which enables batching and byte range reading of multi-source JSONL files using the reallocate-and-retry policy. Moreover. instead of using a 4:1 compression ratio heuristic, the device buffer size is estimated accurately for GZIP, ZIP, and SNAPPY compression types. For remaining types, the files are first decompressed then batched. ~~TODO: Reuse existing JSON tests but with an additional compression parameter to verify correctness.~~ ~~Handled by #17219, which implements compressed JSON writer required for the above test.~~ Multi-source compressed input tests added! Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Kyle Edwards (https://github.com/KyleFromNVIDIA) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/17161 --- cpp/CMakeLists.txt | 1 + cpp/src/io/comp/comp.cpp | 119 +++++++++ cpp/src/io/comp/comp.hpp | 43 ++++ cpp/src/io/comp/gpuinflate.cu | 8 +- cpp/src/io/comp/io_uncomp.hpp | 34 ++- cpp/src/io/comp/uncomp.cpp | 315 +++++++++++++---------- cpp/src/io/json/read_json.cu | 306 +++++++++++++--------- cpp/src/io/json/read_json.hpp | 10 +- cpp/src/io/orc/orc.cpp | 2 +- cpp/tests/io/json/json_chunked_reader.cu | 131 ++++++++-- cpp/tests/io/json/json_test.cpp | 58 +++++ cpp/tests/io/json/json_utils.cuh | 10 +- cpp/tests/large_strings/json_tests.cu | 47 +++- 13 files changed, 771 insertions(+), 313 deletions(-) create mode 100644 cpp/src/io/comp/comp.cpp create mode 100644 cpp/src/io/comp/comp.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6752ce12d83..506f6c185f5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -464,6 +464,7 @@ add_library( src/io/avro/avro_gpu.cu src/io/avro/reader_impl.cu src/io/comp/brotli_dict.cpp + src/io/comp/comp.cpp src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp new file mode 100644 index 00000000000..2176dbb2373 --- /dev/null +++ b/cpp/src/io/comp/comp.cpp @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "comp.hpp" + +#include "io/utilities/hostdevice_vector.hpp" +#include "nvcomp_adapter.hpp" + +#include +#include +#include +#include +#include +#include + +#include // compress + +namespace cudf::io::detail { + +namespace { + +/** + * @brief GZIP host compressor (includes header) + */ +std::vector compress_gzip(host_span src) +{ + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + zs.avail_in = src.size(); + zs.next_in = reinterpret_cast(const_cast(src.data())); + + std::vector dst; + zs.avail_out = 0; + zs.next_out = nullptr; + + int windowbits = 15; + int gzip_encoding = 16; + int ret = deflateInit2( + &zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY); + CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed."); + + uint32_t estcomplen = deflateBound(&zs, src.size()); + dst.resize(estcomplen); + zs.avail_out = estcomplen; + zs.next_out = dst.data(); + + ret = deflate(&zs, Z_FINISH); + CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!"); + dst.resize(std::distance(dst.data(), zs.next_out)); + + ret = deflateEnd(&zs); + CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation"); + + return dst; +} + +/** + * @brief SNAPPY device compressor + */ +std::vector compress_snappy(host_span src, + rmm::cuda_stream_view stream) +{ + auto const d_src = + cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref()); + rmm::device_uvector d_dst(src.size(), stream); + + cudf::detail::hostdevice_vector> inputs(1, stream); + inputs[0] = d_src; + inputs.host_to_device_async(stream); + + cudf::detail::hostdevice_vector> outputs(1, stream); + outputs[0] = d_dst; + outputs.host_to_device_async(stream); + + cudf::detail::hostdevice_vector hd_status(1, stream); + hd_status[0] = {}; + hd_status.host_to_device_async(stream); + + nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream); + + stream.synchronize(); + hd_status.device_to_host_sync(stream); + CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, + "snappy compression failed"); + std::vector dst(d_dst.size()); + cudf::detail::cuda_memcpy(host_span{dst}, device_span{d_dst}, stream); + return dst; +} + +} // namespace + +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + switch (compression) { + case compression_type::GZIP: return compress_gzip(src); + case compression_type::SNAPPY: return compress_snappy(src, stream); + default: CUDF_FAIL("Unsupported compression type"); + } +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp new file mode 100644 index 00000000000..652abbbeda6 --- /dev/null +++ b/cpp/src/io/comp/comp.hpp @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include + +namespace CUDF_EXPORT cudf { +namespace io::detail { + +/** + * @brief Compresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Decompressed host buffer + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Vector containing the Compressed output + */ +std::vector compress(compression_type compression, + host_span src, + rmm::cuda_stream_view stream); + +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index fff1cf0c96a..090ea1430b5 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -980,27 +980,27 @@ __device__ int parse_gzip_header(uint8_t const* src, size_t src_size) { uint8_t flags = src[3]; hdr_len = 10; - if (flags & GZIPHeaderFlag::fextra) // Extra fields present + if (flags & detail::GZIPHeaderFlag::fextra) // Extra fields present { int xlen = src[hdr_len] | (src[hdr_len + 1] << 8); hdr_len += xlen; if (hdr_len >= src_size) return -1; } - if (flags & GZIPHeaderFlag::fname) // Original file name present + if (flags & detail::GZIPHeaderFlag::fname) // Original file name present { // Skip zero-terminated string do { if (hdr_len >= src_size) return -1; } while (src[hdr_len++] != 0); } - if (flags & GZIPHeaderFlag::fcomment) // Comment present + if (flags & detail::GZIPHeaderFlag::fcomment) // Comment present { // Skip zero-terminated string do { if (hdr_len >= src_size) return -1; } while (src[hdr_len++] != 0); } - if (flags & GZIPHeaderFlag::fhcrc) // Header CRC present + if (flags & detail::GZIPHeaderFlag::fhcrc) // Header CRC present { hdr_len += 2; } diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index 1c9578fa5c0..ca722a9b7ee 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ using cudf::host_span; -namespace cudf { -namespace io { +namespace CUDF_EXPORT cudf { +namespace io::detail { /** * @brief Decompresses a system memory buffer. @@ -36,13 +36,35 @@ namespace io { * * @return Vector containing the Decompressed output */ -std::vector decompress(compression_type compression, host_span src); +[[nodiscard]] std::vector decompress(compression_type compression, + host_span src); +/** + * @brief Decompresses a system memory buffer. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * @param dst Destination host span to place decompressed buffer + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return Size of decompressed output + */ size_t decompress(compression_type compression, host_span src, host_span dst, rmm::cuda_stream_view stream); +/** + * @brief Without actually decompressing the compressed input buffer passed, return the size of + * decompressed output. If the decompressed size cannot be extracted apriori, return zero. + * + * @param compression Type of compression of the input data + * @param src Compressed host buffer + * + * @return Size of decompressed output + */ +size_t get_uncompressed_size(compression_type compression, host_span src); + /** * @brief GZIP header flags * See https://tools.ietf.org/html/rfc1952 @@ -55,5 +77,5 @@ constexpr uint8_t fname = 0x08; // Original file name present constexpr uint8_t fcomment = 0x10; // Comment present }; // namespace GZIPHeaderFlag -} // namespace io -} // namespace cudf +} // namespace io::detail +} // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index fb8c308065d..b3d43fa786a 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -28,13 +28,12 @@ #include // memset -using cudf::host_span; - -namespace cudf { -namespace io { +namespace cudf::io::detail { #pragma pack(push, 1) +namespace { + struct gz_file_header_s { uint8_t id1; // 0x1f uint8_t id2; // 0x8b @@ -261,7 +260,7 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz strm.avail_out = dst.size(); strm.total_out = 0; auto zerr = inflateInit2(&strm, -15); // -15 for raw data without GZIP headers - CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream"); + CUDF_EXPECTS(zerr == 0, "Error in DEFLATE stream: inflateInit2 failed"); do { if (strm.avail_out == 0) { dst.resize(strm.total_out + (1 << 30)); @@ -273,125 +272,7 @@ void cpu_inflate_vector(std::vector& dst, uint8_t const* comp_data, siz strm.total_out == dst.size()); dst.resize(strm.total_out); inflateEnd(&strm); - CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream"); -} - -std::vector decompress(compression_type compression, host_span src) -{ - CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); - CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); - - auto raw = src.data(); - uint8_t const* comp_data = nullptr; - size_t comp_len = 0; - size_t uncomp_len = 0; - - switch (compression) { - case compression_type::AUTO: - case compression_type::GZIP: { - gz_archive_s gz; - if (ParseGZArchive(&gz, raw, src.size())) { - compression = compression_type::GZIP; - comp_data = gz.comp_data; - comp_len = gz.comp_len; - uncomp_len = gz.isize; - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - } - case compression_type::ZIP: { - zip_archive_s za; - if (OpenZipArchive(&za, raw, src.size())) { - size_t cdfh_ofs = 0; - for (int i = 0; i < za.eocd->num_entries; i++) { - auto const* cdfh = reinterpret_cast( - reinterpret_cast(za.cdfh) + cdfh_ofs); - int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; - if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { - // Bad cdir - break; - } - // For now, only accept with non-zero file sizes and DEFLATE - if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { - size_t lfh_ofs = cdfh->hdr_ofs; - auto const* lfh = reinterpret_cast(raw + lfh_ofs); - if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && - lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { - if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { - size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; - size_t file_end = file_start + lfh->comp_size; - if (file_end <= src.size()) { - // Pick the first valid file of non-zero size (only 1 file expected in archive) - compression = compression_type::ZIP; - comp_data = raw + file_start; - comp_len = lfh->comp_size; - uncomp_len = lfh->uncomp_size; - break; - } - } - } - } - cdfh_ofs += cdfh_len; - } - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - case compression_type::BZIP2: - if (src.size() > 4) { - auto const* fhdr = reinterpret_cast(raw); - // Check for BZIP2 file signature "BZh1" to "BZh9" - if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && - fhdr->blksz >= '1' && fhdr->blksz <= '9') { - compression = compression_type::BZIP2; - comp_data = raw; - comp_len = src.size(); - uncomp_len = 0; - } - } - if (compression != compression_type::AUTO) break; - [[fallthrough]]; - default: CUDF_FAIL("Unsupported compressed stream type"); - } - - CUDF_EXPECTS(comp_data != nullptr and comp_len > 0, "Unsupported compressed stream type"); - - if (uncomp_len <= 0) { - uncomp_len = comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume - // ~4:1 compression for initial size - } - - if (compression == compression_type::GZIP || compression == compression_type::ZIP) { - // INFLATE - std::vector dst(uncomp_len); - cpu_inflate_vector(dst, comp_data, comp_len); - return dst; - } - if (compression == compression_type::BZIP2) { - size_t src_ofs = 0; - size_t dst_ofs = 0; - int bz_err = 0; - std::vector dst(uncomp_len); - do { - size_t dst_len = uncomp_len - dst_ofs; - bz_err = cpu_bz2_uncompress(comp_data, comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); - if (bz_err == BZ_OUTBUFF_FULL) { - // TBD: We could infer the compression ratio based on produced/consumed byte counts - // in order to minimize realloc events and over-allocation - dst_ofs = dst_len; - dst_len = uncomp_len + (uncomp_len / 2); - dst.resize(dst_len); - uncomp_len = dst_len; - } else if (bz_err == 0) { - uncomp_len = dst_len; - dst.resize(uncomp_len); - } - } while (bz_err == BZ_OUTBUFF_FULL); - CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); - return dst; - } - - CUDF_FAIL("Unsupported compressed stream type"); + CUDF_EXPECTS(zerr == Z_STREAM_END, "Error in DEFLATE stream: Z_STREAM_END not encountered"); } /** @@ -536,14 +417,130 @@ size_t decompress_zstd(host_span src, CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` - cudf::detail::cuda_memcpy_async( - dst.subspan(0, hd_stats[0].bytes_written), - device_span{d_dst.data(), hd_stats[0].bytes_written}, - stream); + cudf::detail::cuda_memcpy(dst.subspan(0, hd_stats[0].bytes_written), + device_span{d_dst.data(), hd_stats[0].bytes_written}, + stream); return hd_stats[0].bytes_written; } +struct source_properties { + compression_type compression = compression_type::NONE; + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; +}; + +source_properties get_source_properties(compression_type compression, host_span src) +{ + auto raw = src.data(); + uint8_t const* comp_data = nullptr; + size_t comp_len = 0; + size_t uncomp_len = 0; + + switch (compression) { + case compression_type::AUTO: + case compression_type::GZIP: { + gz_archive_s gz; + auto const parse_succeeded = ParseGZArchive(&gz, src.data(), src.size()); + CUDF_EXPECTS(parse_succeeded, "Failed to parse GZIP header while fetching source properties"); + compression = compression_type::GZIP; + comp_data = gz.comp_data; + comp_len = gz.comp_len; + uncomp_len = gz.isize; + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::ZIP: { + zip_archive_s za; + if (OpenZipArchive(&za, raw, src.size())) { + size_t cdfh_ofs = 0; + for (int i = 0; i < za.eocd->num_entries; i++) { + auto const* cdfh = reinterpret_cast( + reinterpret_cast(za.cdfh) + cdfh_ofs); + int cdfh_len = sizeof(zip_cdfh_s) + cdfh->fname_len + cdfh->extra_len + cdfh->comment_len; + if (cdfh_ofs + cdfh_len > za.eocd->cdir_size || cdfh->sig != 0x0201'4b50) { + // Bad cdir + break; + } + // For now, only accept with non-zero file sizes and DEFLATE + if (cdfh->comp_method == 8 && cdfh->comp_size > 0 && cdfh->uncomp_size > 0) { + size_t lfh_ofs = cdfh->hdr_ofs; + auto const* lfh = reinterpret_cast(raw + lfh_ofs); + if (lfh_ofs + sizeof(zip_lfh_s) <= src.size() && lfh->sig == 0x0403'4b50 && + lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len <= src.size()) { + if (lfh->comp_method == 8 && lfh->comp_size > 0 && lfh->uncomp_size > 0) { + size_t file_start = lfh_ofs + sizeof(zip_lfh_s) + lfh->fname_len + lfh->extra_len; + size_t file_end = file_start + lfh->comp_size; + if (file_end <= src.size()) { + // Pick the first valid file of non-zero size (only 1 file expected in archive) + compression = compression_type::ZIP; + comp_data = raw + file_start; + comp_len = lfh->comp_size; + uncomp_len = lfh->uncomp_size; + break; + } + } + } + } + cdfh_ofs += cdfh_len; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::BZIP2: { + if (src.size() > 4) { + auto const* fhdr = reinterpret_cast(raw); + // Check for BZIP2 file signature "BZh1" to "BZh9" + if (fhdr->sig[0] == 'B' && fhdr->sig[1] == 'Z' && fhdr->sig[2] == 'h' && + fhdr->blksz >= '1' && fhdr->blksz <= '9') { + compression = compression_type::BZIP2; + comp_data = raw; + comp_len = src.size(); + uncomp_len = 0; + } + } + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + case compression_type::SNAPPY: { + uncomp_len = 0; + auto cur = src.begin(); + auto const end = src.end(); + // Read uncompressed length (varint) + { + uint32_t l = 0, c; + do { + c = *cur++; + auto const lo7 = c & 0x7f; + if (l >= 28 && c > 0xf) { + uncomp_len = 0; + break; + } + uncomp_len |= lo7 << l; + l += 7; + } while (c > 0x7f && cur < end); + CUDF_EXPECTS(uncomp_len != 0 and cur < end, "Error in retrieving SNAPPY source properties"); + } + comp_data = raw; + comp_len = src.size(); + if (compression != compression_type::AUTO) break; + [[fallthrough]]; + } + default: CUDF_FAIL("Unsupported compressed stream type"); + } + + return source_properties{compression, comp_data, comp_len, uncomp_len}; +} + +} // namespace + +size_t get_uncompressed_size(compression_type compression, host_span src) +{ + return get_source_properties(compression, src).uncomp_len; +} + size_t decompress(compression_type compression, host_span src, host_span dst, @@ -558,5 +555,63 @@ size_t decompress(compression_type compression, } } -} // namespace io -} // namespace cudf +std::vector decompress(compression_type compression, host_span src) +{ + CUDF_EXPECTS(src.data() != nullptr, "Decompression: Source cannot be nullptr"); + CUDF_EXPECTS(not src.empty(), "Decompression: Source size cannot be 0"); + + auto srcprops = get_source_properties(compression, src); + CUDF_EXPECTS(srcprops.comp_data != nullptr and srcprops.comp_len > 0, + "Unsupported compressed stream type"); + + if (srcprops.uncomp_len <= 0) { + srcprops.uncomp_len = + srcprops.comp_len * 4 + 4096; // In case uncompressed size isn't known in advance, assume + // ~4:1 compression for initial size + } + + if (compression == compression_type::GZIP) { + // INFLATE + std::vector dst(srcprops.uncomp_len); + decompress_gzip(src, dst); + return dst; + } + if (compression == compression_type::ZIP) { + std::vector dst(srcprops.uncomp_len); + cpu_inflate_vector(dst, srcprops.comp_data, srcprops.comp_len); + return dst; + } + if (compression == compression_type::BZIP2) { + size_t src_ofs = 0; + size_t dst_ofs = 0; + int bz_err = 0; + std::vector dst(srcprops.uncomp_len); + do { + size_t dst_len = srcprops.uncomp_len - dst_ofs; + bz_err = cpu_bz2_uncompress( + srcprops.comp_data, srcprops.comp_len, dst.data() + dst_ofs, &dst_len, &src_ofs); + if (bz_err == BZ_OUTBUFF_FULL) { + // TBD: We could infer the compression ratio based on produced/consumed byte counts + // in order to minimize realloc events and over-allocation + dst_ofs = dst_len; + dst_len = srcprops.uncomp_len + (srcprops.uncomp_len / 2); + dst.resize(dst_len); + srcprops.uncomp_len = dst_len; + } else if (bz_err == 0) { + srcprops.uncomp_len = dst_len; + dst.resize(srcprops.uncomp_len); + } + } while (bz_err == BZ_OUTBUFF_FULL); + CUDF_EXPECTS(bz_err == 0, "Decompression: error in stream"); + return dst; + } + if (compression == compression_type::SNAPPY) { + std::vector dst(srcprops.uncomp_len); + decompress_snappy(src, dst); + return dst; + } + + CUDF_FAIL("Unsupported compressed stream type"); +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 279f5e71351..82d8152ca1c 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,70 @@ namespace cudf::io::json::detail { namespace { +class compressed_host_buffer_source final : public datasource { + public: + explicit compressed_host_buffer_source(std::unique_ptr const& src, + compression_type comptype) + : _comptype{comptype}, _dbuf_ptr{src->host_read(0, src->size())} + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (comptype == compression_type::GZIP || comptype == compression_type::ZIP || + comptype == compression_type::SNAPPY) { + _decompressed_ch_buffer_size = cudf::io::detail::get_uncompressed_size(_comptype, ch_buffer); + } else { + _decompressed_buffer = cudf::io::detail::decompress(_comptype, ch_buffer); + _decompressed_ch_buffer_size = _decompressed_buffer.size(); + } + } + + size_t host_read(size_t offset, size_t size, uint8_t* dst) override + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = cudf::io::detail::decompress(_comptype, ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) { + std::memcpy(dst, decompressed_hbuf.data() + offset, count); + return count; + } + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + std::memcpy(dst, _decompressed_buffer.data() + offset, count); + return count; + } + + std::unique_ptr host_read(size_t offset, size_t size) override + { + auto ch_buffer = host_span(reinterpret_cast(_dbuf_ptr->data()), + _dbuf_ptr->size()); + if (_decompressed_buffer.empty()) { + auto decompressed_hbuf = cudf::io::detail::decompress(_comptype, ch_buffer); + auto const count = std::min(size, decompressed_hbuf.size() - offset); + bool partial_read = offset + count < decompressed_hbuf.size(); + if (!partial_read) + return std::make_unique>>( + std::move(decompressed_hbuf), decompressed_hbuf.data() + offset, count); + _decompressed_buffer = std::move(decompressed_hbuf); + } + auto const count = std::min(size, _decompressed_buffer.size() - offset); + return std::make_unique(_decompressed_buffer.data() + offset, count); + } + + [[nodiscard]] bool supports_device_read() const override { return false; } + + [[nodiscard]] size_t size() const override { return _decompressed_ch_buffer_size; } + + private: + std::unique_ptr _dbuf_ptr; + compression_type _comptype; + size_t _decompressed_ch_buffer_size; + std::vector _decompressed_buffer; +}; + // Return total size of sources enclosing the passed range std::size_t sources_size(host_span> const sources, std::size_t range_offset, @@ -126,13 +191,12 @@ datasource::owning_buffer get_record_range_raw_input( { CUDF_FUNC_RANGE(); - std::size_t const total_source_size = sources_size(sources, 0, 0); - auto constexpr num_delimiter_chars = 1; - auto const delimiter = reader_opts.get_delimiter(); - auto const num_extra_delimiters = num_delimiter_chars * sources.size(); - compression_type const reader_compression = reader_opts.get_compression(); - std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); - std::size_t chunk_size = reader_opts.get_byte_range_size(); + std::size_t const total_source_size = sources_size(sources, 0, 0); + auto constexpr num_delimiter_chars = 1; + auto const delimiter = reader_opts.get_delimiter(); + auto const num_extra_delimiters = num_delimiter_chars * sources.size(); + std::size_t const chunk_offset = reader_opts.get_byte_range_offset(); + std::size_t chunk_size = reader_opts.get_byte_range_size(); CUDF_EXPECTS(total_source_size ? chunk_offset < total_source_size : !chunk_offset, "Invalid offsetting", @@ -143,22 +207,16 @@ datasource::owning_buffer get_record_range_raw_input( int num_subchunks_prealloced = should_load_till_last_source ? 0 : max_subchunks_prealloced; std::size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); - // The allocation for single source compressed input is estimated by assuming a ~4:1 - // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea - // of subchunks. - auto constexpr header_size = 4096; std::size_t buffer_size = - reader_compression != compression_type::NONE - ? total_source_size * estimated_compression_ratio + header_size - : std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + std::min(total_source_size, chunk_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; rmm::device_buffer buffer(buffer_size, stream); device_span bufspan(reinterpret_cast(buffer.data()), buffer.size()); // Offset within buffer indicating first read position std::int64_t buffer_offset = 0; - auto readbufspan = ingest_raw_input( - bufspan, sources, reader_compression, chunk_offset, chunk_size, delimiter, stream); + auto readbufspan = + ingest_raw_input(bufspan, sources, chunk_offset, chunk_size, delimiter, stream); auto const shift_for_nonzero_offset = std::min(chunk_offset, 1); auto const first_delim_pos = @@ -179,7 +237,6 @@ datasource::owning_buffer get_record_range_raw_input( buffer_offset += readbufspan.size(); readbufspan = ingest_raw_input(bufspan.last(buffer_size - buffer_offset), sources, - reader_compression, next_subchunk_start, size_per_subchunk, delimiter, @@ -196,11 +253,9 @@ datasource::owning_buffer get_record_range_raw_input( // Our buffer_size estimate is insufficient to read until the end of the line! We need to // allocate more memory and try again! num_subchunks_prealloced *= 2; - buffer_size = reader_compression != compression_type::NONE - ? 2 * buffer_size - : std::min(total_source_size, - buffer_size + num_subchunks_prealloced * size_per_subchunk) + - num_extra_delimiters; + buffer_size = std::min(total_source_size, + buffer_size + num_subchunks_prealloced * size_per_subchunk) + + num_extra_delimiters; buffer.resize(buffer_size, stream); bufspan = device_span(reinterpret_cast(buffer.data()), buffer.size()); } @@ -258,111 +313,11 @@ table_with_metadata read_batch(host_span> sources, return device_parse_nested_json(buffer, reader_opts, stream, mr); } -} // anonymous namespace - -device_span ingest_raw_input(device_span buffer, - host_span> sources, - compression_type compression, - std::size_t range_offset, - std::size_t range_size, - char delimiter, - rmm::cuda_stream_view stream) +table_with_metadata read_json_impl(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { - CUDF_FUNC_RANGE(); - // We append a line delimiter between two files to make sure the last line of file i and the first - // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line - // delimiter. - auto constexpr num_delimiter_chars = 1; - - if (compression == compression_type::NONE) { - auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); - std::vector prefsum_source_sizes(sources.size()); - std::vector> h_buffers; - std::size_t bytes_read = 0; - std::transform_inclusive_scan(sources.begin(), - sources.end(), - prefsum_source_sizes.begin(), - std::plus{}, - [](std::unique_ptr const& s) { return s->size(); }); - auto upper = - std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); - std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); - - auto const total_bytes_to_read = - std::min(range_size, prefsum_source_sizes.back() - range_offset); - range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; - for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; - i++) { - if (sources[i]->is_empty()) continue; - auto data_size = - std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); - auto destination = reinterpret_cast(buffer.data()) + bytes_read + - (num_delimiter_chars * delimiter_map.size()); - if (sources[i]->is_device_read_preferred(data_size)) { - bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); - } else { - h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); - auto const& h_buffer = h_buffers.back(); - CUDF_CUDA_TRY(cudaMemcpyAsync( - destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); - bytes_read += h_buffer->size(); - } - range_offset = 0; - delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); - } - // Removing delimiter inserted after last non-empty source is read - if (!delimiter_map.empty()) { delimiter_map.pop_back(); } - - // If this is a multi-file source, we scatter the JSON line delimiters between files - if (sources.size() > 1) { - static_assert(num_delimiter_chars == 1, - "Currently only single-character delimiters are supported"); - auto const delimiter_source = thrust::make_constant_iterator(delimiter); - auto const d_delimiter_map = cudf::detail::make_device_uvector_async( - delimiter_map, stream, cudf::get_current_device_resource_ref()); - thrust::scatter(rmm::exec_policy_nosync(stream), - delimiter_source, - delimiter_source + d_delimiter_map.size(), - d_delimiter_map.data(), - buffer.data()); - } - stream.synchronize(); - return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); - } - // TODO: allow byte range reading from multiple compressed files. - auto remaining_bytes_to_read = std::min(range_size, sources[0]->size() - range_offset); - auto hbuffer = std::vector(remaining_bytes_to_read); - // Single read because only a single compressed source is supported - // Reading to host because decompression of a single block is much faster on the CPU - sources[0]->host_read(range_offset, remaining_bytes_to_read, hbuffer.data()); - auto uncomp_data = decompress(compression, hbuffer); - auto ret_buffer = buffer.first(uncomp_data.size()); - cudf::detail::cuda_memcpy( - ret_buffer, - host_span{reinterpret_cast(uncomp_data.data()), uncomp_data.size()}, - stream); - return ret_buffer; -} - -table_with_metadata read_json(host_span> sources, - json_reader_options const& reader_opts, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_FUNC_RANGE(); - - if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { - CUDF_EXPECTS(reader_opts.is_enabled_lines(), - "Specifying a byte range is supported only for JSON Lines"); - } - - if (sources.size() > 1) { - CUDF_EXPECTS(reader_opts.get_compression() == compression_type::NONE, - "Multiple compressed inputs are not supported"); - CUDF_EXPECTS(reader_opts.is_enabled_lines(), - "Multiple inputs are supported only for JSON Lines format"); - } - /* * The batched JSON reader enforces that the size of each batch is at most INT_MAX * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by @@ -462,4 +417,101 @@ table_with_metadata read_json(host_span> sources, {partial_tables[0].metadata.schema_info}}; } +} // anonymous namespace + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + std::size_t range_offset, + std::size_t range_size, + char delimiter, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + // We append a line delimiter between two files to make sure the last line of file i and the first + // line of file i+1 don't end up on the same JSON line, if file i does not already end with a line + // delimiter. + auto constexpr num_delimiter_chars = 1; + + auto delimiter_map = cudf::detail::make_empty_host_vector(sources.size(), stream); + std::vector prefsum_source_sizes(sources.size()); + std::vector> h_buffers; + std::size_t bytes_read = 0; + std::transform_inclusive_scan(sources.begin(), + sources.end(), + prefsum_source_sizes.begin(), + std::plus{}, + [](std::unique_ptr const& s) { return s->size(); }); + auto upper = + std::upper_bound(prefsum_source_sizes.begin(), prefsum_source_sizes.end(), range_offset); + std::size_t start_source = std::distance(prefsum_source_sizes.begin(), upper); + + auto const total_bytes_to_read = std::min(range_size, prefsum_source_sizes.back() - range_offset); + range_offset -= start_source ? prefsum_source_sizes[start_source - 1] : 0; + for (std::size_t i = start_source; i < sources.size() && bytes_read < total_bytes_to_read; i++) { + if (sources[i]->is_empty()) continue; + auto data_size = std::min(sources[i]->size() - range_offset, total_bytes_to_read - bytes_read); + auto destination = reinterpret_cast(buffer.data()) + bytes_read + + (num_delimiter_chars * delimiter_map.size()); + if (sources[i]->is_device_read_preferred(data_size)) { + bytes_read += sources[i]->device_read(range_offset, data_size, destination, stream); + } else { + h_buffers.emplace_back(sources[i]->host_read(range_offset, data_size)); + auto const& h_buffer = h_buffers.back(); + CUDF_CUDA_TRY(cudaMemcpyAsync( + destination, h_buffer->data(), h_buffer->size(), cudaMemcpyHostToDevice, stream.value())); + bytes_read += h_buffer->size(); + } + range_offset = 0; + delimiter_map.push_back(bytes_read + (num_delimiter_chars * delimiter_map.size())); + } + // Removing delimiter inserted after last non-empty source is read + if (!delimiter_map.empty()) { delimiter_map.pop_back(); } + + // If this is a multi-file source, we scatter the JSON line delimiters between files + if (sources.size() > 1 && !delimiter_map.empty()) { + static_assert(num_delimiter_chars == 1, + "Currently only single-character delimiters are supported"); + auto const delimiter_source = thrust::make_constant_iterator(delimiter); + auto const d_delimiter_map = cudf::detail::make_device_uvector_async( + delimiter_map, stream, cudf::get_current_device_resource_ref()); + thrust::scatter(rmm::exec_policy_nosync(stream), + delimiter_source, + delimiter_source + d_delimiter_map.size(), + d_delimiter_map.data(), + buffer.data()); + } + stream.synchronize(); + return buffer.first(bytes_read + (delimiter_map.size() * num_delimiter_chars)); +} + +table_with_metadata read_json(host_span> sources, + json_reader_options const& reader_opts, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + + if (reader_opts.get_byte_range_offset() != 0 or reader_opts.get_byte_range_size() != 0) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "Specifying a byte range is supported only for JSON Lines"); + } + + if (sources.size() > 1) { + CUDF_EXPECTS(reader_opts.is_enabled_lines(), + "Multiple inputs are supported only for JSON Lines format"); + } + + if (reader_opts.get_compression() == compression_type::NONE) + return read_json_impl(sources, reader_opts, stream, mr); + + std::vector> compressed_sources; + for (size_t i = 0; i < sources.size(); i++) { + compressed_sources.emplace_back( + std::make_unique(sources[i], reader_opts.get_compression())); + } + // in read_json_impl, we need the compressed source size to actually be the + // uncompressed source size for correct batching + return read_json_impl(compressed_sources, reader_opts, stream, mr); +} + } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 4def69cc629..ac980938522 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -32,10 +32,9 @@ namespace CUDF_EXPORT cudf { namespace io::json::detail { // Some magic numbers -constexpr int num_subchunks = 10; // per chunk_size -constexpr size_t min_subchunk_size = 10000; -constexpr int estimated_compression_ratio = 4; -constexpr int max_subchunks_prealloced = 3; +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int max_subchunks_prealloced = 3; /** * @brief Read from array of data sources into RMM buffer. The size of the returned device span @@ -45,15 +44,14 @@ constexpr int max_subchunks_prealloced = 3; * * @param buffer Device span buffer to which data is read * @param sources Array of data sources - * @param compression Compression format of source * @param range_offset Number of bytes to skip from source start * @param range_size Number of bytes to read from source + * @param delimiter Delimiter character for JSONL inputs * @param stream CUDA stream used for device memory operations and kernel launches * @returns A subspan of the input device span containing data read */ device_span ingest_raw_input(device_span buffer, host_span> sources, - compression_type compression, size_t range_offset, size_t range_size, char delimiter, diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 1fe5e5aa41e..7046b3b3f91 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -460,7 +460,7 @@ host_span OrcDecompressor::decompress_blocks(host_span @@ -31,36 +32,66 @@ /** * @brief Base test fixture for JSON reader tests */ -struct JsonReaderTest : public cudf::test::BaseFixture {}; +struct JsonReaderTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonReaderTest, + JsonReaderTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); cudf::test::TempDirTestEnvironment* const temp_env = static_cast( ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); -TEST_F(JsonReaderTest, ByteRange_SingleSource) +TEST_P(JsonReaderTest, ByteRange_SingleSource) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = cudf::io::json_reader_options::builder( cudf::io::source_info{json_string.c_str(), json_string.size()}) .compression(cudf::io::compression_type::NONE) .lines(true); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span(cdata.data(), cdata.size())}) + .compression(comptype) + .lines(true); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); - auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto cdatasources = cudf::io::datasource::create(cjson_lines_options.get_source().host_buffers()); // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); @@ -77,37 +108,54 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource) } } -TEST_F(JsonReaderTest, ReadCompleteFiles) +TEST_P(JsonReaderTest, ReadCompleteFiles) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + + auto cfilename = temp_env->get_temp_dir() + "cParseInRangeIntegers.json"; { - std::ofstream outfile(filename, std::ofstream::out); - outfile << json_string; + std::ofstream outfile(cfilename, std::ofstream::out); + std::copy(cdata.begin(), cdata.end(), std::ostreambuf_iterator(outfile)); } constexpr int num_sources = 5; - std::vector filepaths(num_sources, filename); + std::vector cfilepaths(num_sources, cfilename); - cudf::io::json_reader_options in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) + cudf::io::json_reader_options cin_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepaths}) .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); - cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + cudf::io::table_with_metadata result = cudf::io::read_json(cin_options); std::vector part_tables; - for (auto filepath : filepaths) { - cudf::io::json_reader_options part_in_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepath}) + for (auto cfilepath : cfilepaths) { + cudf::io::json_reader_options part_cin_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepath}) .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); - part_tables.push_back(cudf::io::read_json(part_in_options)); + part_tables.push_back(cudf::io::read_json(part_cin_options)); } auto part_table_views = std::vector(part_tables.size()); @@ -120,42 +168,69 @@ TEST_F(JsonReaderTest, ReadCompleteFiles) CUDF_TEST_EXPECT_TABLES_EQUIVALENT(result.tbl->view(), expected_result->view()); } -TEST_F(JsonReaderTest, ByteRange_MultiSource) +TEST_P(JsonReaderTest, ByteRange_MultiSource) { + cudf::io::compression_type const comptype = GetParam(); + std::string const json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - auto filename = temp_env->get_temp_dir() + "ParseInRangeIntegers.json"; + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + + auto cfilename = temp_env->get_temp_dir() + "cParseInRangeIntegers.json"; { - std::ofstream outfile(filename, std::ofstream::out); - outfile << json_string; + std::ofstream outfile(cfilename, std::ofstream::out); + std::copy(cdata.begin(), cdata.end(), std::ostreambuf_iterator(outfile)); } constexpr int num_sources = 5; - std::vector filepaths(num_sources, filename); + std::vector cfilepaths(num_sources, cfilename); + std::vector> hostbufs( + num_sources, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = - cudf::io::json_reader_options::builder(cudf::io::source_info{filepaths}) - .lines(true) + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(hostbufs.data(), hostbufs.size())}) .compression(cudf::io::compression_type::NONE) + .lines(true); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{cfilepaths}) + .lines(true) + .compression(comptype) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); - auto file_paths = json_lines_options.get_source().filepaths(); - std::vector> datasources; - for (auto& fp : file_paths) { - datasources.emplace_back(cudf::io::datasource::create(fp)); + std::vector> cdatasources; + for (auto& fp : cfilepaths) { + cdatasources.emplace_back(cudf::io::datasource::create(fp)); } + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 26937c9298a..3c8db99c3c7 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -14,6 +14,9 @@ * limitations under the License. */ +#include "io/comp/comp.hpp" +#include "io/comp/io_uncomp.hpp" + #include #include #include @@ -3252,4 +3255,59 @@ TEST_F(JsonReaderTest, JsonNestedDtypeFilterWithOrder) } } +struct JsonCompressedIOTest : public cudf::test::BaseFixture, + public testing::WithParamInterface {}; + +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonCompressedIOTest, + JsonCompressedIOTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::NONE)); + +TEST_P(JsonCompressedIOTest, BasicJsonLines) +{ + cudf::io::compression_type const comptype = GetParam(); + std::string data = to_records_orient( + {{{"0", "1"}, {"1", "1.1"}}, {{"0", "2"}, {"1", "2.2"}}, {{"0", "3"}, {"1", "3.3"}}}, "\n"); + + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(data.data()), data.size()), + cudf::get_default_stream()); + auto decomp_out_buffer = cudf::io::detail::decompress( + comptype, cudf::host_span(cdata.data(), cdata.size())); + std::string const expected = R"({"0":1, "1":1.1} +{"0":2, "1":2.2} +{"0":3, "1":3.3})"; + EXPECT_EQ( + expected, + std::string(reinterpret_cast(decomp_out_buffer.data()), decomp_out_buffer.size())); + } else + cdata = std::vector(reinterpret_cast(data.data()), + reinterpret_cast(data.data()) + data.size()); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{cudf::host_span(cdata.data(), cdata.size())}) + .dtypes(std::vector{dtype(), dtype()}) + .compression(comptype) + .lines(true); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + + EXPECT_EQ(result.tbl->num_columns(), 2); + EXPECT_EQ(result.tbl->num_rows(), 3); + + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::INT32); + EXPECT_EQ(result.tbl->get_column(1).type().id(), cudf::type_id::FLOAT64); + + EXPECT_EQ(result.metadata.schema_info[0].name, "0"); + EXPECT_EQ(result.metadata.schema_info[1].name, "1"); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), int_wrapper{{1, 2, 3}}); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2, 3.3}}); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh index c31bb2d24e0..629a89fa777 100644 --- a/cpp/tests/io/json/json_utils.cuh +++ b/cpp/tests/io/json/json_utils.cuh @@ -32,7 +32,9 @@ template std::vector split_byte_range_reading( cudf::host_span> sources, + cudf::host_span> csources, cudf::io::json_reader_options const& reader_opts, + cudf::io::json_reader_options const& creader_opts, IndexType chunk_size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) @@ -49,7 +51,6 @@ std::vector split_byte_range_reading( rmm::device_uvector buffer(total_source_size, stream); auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, sources, - reader_opts.get_compression(), reader_opts.get_byte_range_offset(), reader_opts.get_byte_range_size(), reader_opts.get_delimiter(), @@ -95,10 +96,11 @@ std::vector split_byte_range_reading( record_ranges.emplace_back(prev, total_source_size); std::vector tables; + auto creader_opts_chunk = creader_opts; for (auto const& [chunk_start, chunk_end] : record_ranges) { - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); - tables.push_back(cudf::io::json::detail::read_json(sources, reader_opts_chunk, stream, mr)); + creader_opts_chunk.set_byte_range_offset(chunk_start); + creader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(cudf::io::json::detail::read_json(csources, creader_opts_chunk, stream, mr)); } // assume all records have same number of columns, and inferred same type. (or schema is passed) // TODO a step before to merge all columns, types and infer final schema. diff --git a/cpp/tests/large_strings/json_tests.cu b/cpp/tests/large_strings/json_tests.cu index a212d7d654a..0703fa72f67 100644 --- a/cpp/tests/large_strings/json_tests.cu +++ b/cpp/tests/large_strings/json_tests.cu @@ -15,6 +15,7 @@ */ #include "../io/json/json_utils.cuh" +#include "io/comp/comp.hpp" #include "large_strings_fixture.hpp" #include @@ -25,10 +26,19 @@ #include #include -struct JsonLargeReaderTest : public cudf::test::StringsLargeTest {}; +struct JsonLargeReaderTest : public cudf::test::StringsLargeTest, + public testing::WithParamInterface {}; -TEST_F(JsonLargeReaderTest, MultiBatch) +// Parametrize qualifying JSON tests for multiple compression types +INSTANTIATE_TEST_SUITE_P(JsonLargeReaderTest, + JsonLargeReaderTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::NONE)); + +TEST_P(JsonLargeReaderTest, MultiBatch) { + cudf::io::compression_type const comptype = GetParam(); + std::string json_string = R"( { "a": { "y" : 6}, "b" : [1, 2, 3], "c": 11 } { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } @@ -48,11 +58,26 @@ TEST_F(JsonLargeReaderTest, MultiBatch) json_string += json_string; } + std::vector cdata; + if (comptype != cudf::io::compression_type::NONE) { + cdata = cudf::io::detail::compress( + comptype, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size()), + cudf::get_default_stream()); + } else + cdata = std::vector( + reinterpret_cast(json_string.data()), + reinterpret_cast(json_string.data()) + json_string.size()); + constexpr int num_sources = 2; std::vector> hostbufs( num_sources, cudf::host_span(reinterpret_cast(json_string.data()), json_string.size())); + std::vector> chostbufs( + num_sources, + cudf::host_span(reinterpret_cast(cdata.data()), cdata.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = @@ -62,14 +87,20 @@ TEST_F(JsonLargeReaderTest, MultiBatch) .lines(true) .compression(cudf::io::compression_type::NONE) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); + cudf::io::json_reader_options cjson_lines_options = + cudf::io::json_reader_options::builder( + cudf::io::source_info{ + cudf::host_span>(chostbufs.data(), chostbufs.size())}) + .lines(true) + .compression(comptype) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader - cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); + cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(cjson_lines_options); + + auto datasources = cudf::io::datasource::create(json_lines_options.get_source().host_buffers()); + auto cdatasources = cudf::io::datasource::create(cjson_lines_options.get_source().host_buffers()); - std::vector> datasources; - for (auto& hb : hostbufs) { - datasources.emplace_back(cudf::io::datasource::create(hb)); - } // Test for different chunk sizes std::vector chunk_sizes{batch_size_upper_bound / 4, batch_size_upper_bound / 2, @@ -79,7 +110,9 @@ TEST_F(JsonLargeReaderTest, MultiBatch) for (auto chunk_size : chunk_sizes) { auto const tables = split_byte_range_reading(datasources, + cdatasources, json_lines_options, + cjson_lines_options, chunk_size, cudf::get_default_stream(), cudf::get_current_device_resource_ref());