Skip to content

Commit

Permalink
Reading multi-source compressed JSONL files (#17161)
Browse files Browse the repository at this point in the history
Fixes #17068 
Fixes #12299

This PR introduces a new datasource for compressed inputs which enables batching and byte range reading of multi-source JSONL files using the reallocate-and-retry policy. Moreover. instead of using a 4:1 compression ratio heuristic, the device buffer size is estimated accurately for GZIP, ZIP, and SNAPPY compression types. For remaining types, the files are first decompressed then batched.

~~TODO: Reuse existing JSON tests but with an additional compression parameter to verify correctness.~~
~~Handled by #17219, which implements compressed JSON writer required for the above test.~~
Multi-source compressed input tests added!

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Kyle Edwards (https://github.com/KyleFromNVIDIA)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #17161
  • Loading branch information
shrshi authored Nov 18, 2024
1 parent aeb6a30 commit 03ac845
Show file tree
Hide file tree
Showing 13 changed files with 771 additions and 313 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ add_library(
src/io/avro/avro_gpu.cu
src/io/avro/reader_impl.cu
src/io/comp/brotli_dict.cpp
src/io/comp/comp.cpp
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
Expand Down
119 changes: 119 additions & 0 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "comp.hpp"

#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress

namespace cudf::io::detail {

namespace {

/**
* @brief GZIP host compressor (includes header)
*/
std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src)
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = src.size();
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));

std::vector<uint8_t> dst;
zs.avail_out = 0;
zs.next_out = nullptr;

int windowbits = 15;
int gzip_encoding = 16;
int ret = deflateInit2(
&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed.");

uint32_t estcomplen = deflateBound(&zs, src.size());
dst.resize(estcomplen);
zs.avail_out = estcomplen;
zs.next_out = dst.data();

ret = deflate(&zs, Z_FINISH);
CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!");
dst.resize(std::distance(dst.data(), zs.next_out));

ret = deflateEnd(&zs);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation");

return dst;
}

/**
* @brief SNAPPY device compressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
auto const d_src =
cudf::detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
}

} // namespace

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
switch (compression) {
case compression_type::GZIP: return compress_gzip(src);
case compression_type::SNAPPY: return compress_snappy(src, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

} // namespace cudf::io::detail
43 changes: 43 additions & 0 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief Compresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Decompressed host buffer
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Vector containing the Compressed output
*/
std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
8 changes: 4 additions & 4 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
Expand Up @@ -980,27 +980,27 @@ __device__ int parse_gzip_header(uint8_t const* src, size_t src_size)
{
uint8_t flags = src[3];
hdr_len = 10;
if (flags & GZIPHeaderFlag::fextra) // Extra fields present
if (flags & detail::GZIPHeaderFlag::fextra) // Extra fields present
{
int xlen = src[hdr_len] | (src[hdr_len + 1] << 8);
hdr_len += xlen;
if (hdr_len >= src_size) return -1;
}
if (flags & GZIPHeaderFlag::fname) // Original file name present
if (flags & detail::GZIPHeaderFlag::fname) // Original file name present
{
// Skip zero-terminated string
do {
if (hdr_len >= src_size) return -1;
} while (src[hdr_len++] != 0);
}
if (flags & GZIPHeaderFlag::fcomment) // Comment present
if (flags & detail::GZIPHeaderFlag::fcomment) // Comment present
{
// Skip zero-terminated string
do {
if (hdr_len >= src_size) return -1;
} while (src[hdr_len++] != 0);
}
if (flags & GZIPHeaderFlag::fhcrc) // Header CRC present
if (flags & detail::GZIPHeaderFlag::fhcrc) // Header CRC present
{
hdr_len += 2;
}
Expand Down
34 changes: 28 additions & 6 deletions cpp/src/io/comp/io_uncomp.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,8 +25,8 @@

using cudf::host_span;

namespace cudf {
namespace io {
namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief Decompresses a system memory buffer.
Expand All @@ -36,13 +36,35 @@ namespace io {
*
* @return Vector containing the Decompressed output
*/
std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t const> src);
[[nodiscard]] std::vector<uint8_t> decompress(compression_type compression,
host_span<uint8_t const> src);

/**
* @brief Decompresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
* @param dst Destination host span to place decompressed buffer
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Size of decompressed output
*/
size_t decompress(compression_type compression,
host_span<uint8_t const> src,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream);

/**
* @brief Without actually decompressing the compressed input buffer passed, return the size of
* decompressed output. If the decompressed size cannot be extracted apriori, return zero.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
*
* @return Size of decompressed output
*/
size_t get_uncompressed_size(compression_type compression, host_span<uint8_t const> src);

/**
* @brief GZIP header flags
* See https://tools.ietf.org/html/rfc1952
Expand All @@ -55,5 +77,5 @@ constexpr uint8_t fname = 0x08; // Original file name present
constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

} // namespace io
} // namespace cudf
} // namespace io::detail
} // namespace CUDF_EXPORT cudf
Loading

0 comments on commit 03ac845

Please sign in to comment.