Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alignment of compressed blocks in ORC writer #12077

Merged
merged 17 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 144 additions & 83 deletions cpp/src/io/comp/nvcomp_adapter.cpp

Large diffs are not rendered by default.

48 changes: 44 additions & 4 deletions cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "gpuinflate.hpp"

#include <io/utilities/config_utils.hpp>

#include <cudf/utilities/error.hpp>
#include <cudf/utilities/span.hpp>

Expand All @@ -30,14 +32,52 @@ namespace cudf::io::nvcomp {
enum class compression_type { SNAPPY, ZSTD, DEFLATE };

/**
* @brief Whether the given compression type is enabled through nvCOMP.
* @brief Set of parameters that impact whether the use nvCOMP features is enabled.
*/
struct feature_status_parameters {
int lib_major_version;
int lib_minor_version;
int lib_patch_version;
bool are_all_integrations_enabled;
bool are_stable_integrations_enabled;
int compute_capability_major;

feature_status_parameters();
feature_status_parameters(
int major, int minor, int patch, bool all_enabled, bool stable_enabled, int cc_major)
: lib_major_version{major},
lib_minor_version{minor},
lib_patch_version{patch},
are_all_integrations_enabled{all_enabled},
are_stable_integrations_enabled{stable_enabled},
compute_capability_major{cc_major}
{
}
};

/**
* @brief If a compression type is disabled through nvCOMP, returns the reason as a string.
*
* Result cab depend on nvCOMP version and environment variables.
*
* @param compression Compression type
* @param params Optional parameters to query status with different configurations
* @returns Reason for the feature disablement, `std::nullopt` if the feature is enabled
*/
[[nodiscard]] std::optional<std::string> is_compression_disabled(
compression_type compression, feature_status_parameters params = feature_status_parameters());

/**
* @brief If a decompression type is disabled through nvCOMP, returns the reason as a string.
*
* Result depends on nvCOMP version and environment variables.
* Result can depend on nvCOMP version and environment variables.
*
* @param compression Compression type
* @returns true if nvCOMP use is enabled; false otherwise
* @param params Optional parameters to query status with different configurations
* @returns Reason for the feature disablement, `std::nullopt` if the feature is enabled
*/
[[nodiscard]] bool is_compression_enabled(compression_type compression);
[[nodiscard]] std::optional<std::string> is_decompression_disabled(
compression_type compression, feature_status_parameters params = feature_status_parameters());

/**
* @brief Device batch decompression of given type.
Expand Down
23 changes: 13 additions & 10 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -379,34 +379,37 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
device_span<device_span<uint8_t>> inflate_out_view{inflate_out.data(), num_compressed_blocks};
switch (decompressor.compression()) {
case compression_type::ZLIB:
// See https://github.com/rapidsai/cudf/issues/11812
if (false) {
if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) {
gpuinflate(
inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream);
} else {
nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE,
inflate_in_view,
inflate_out_view,
inflate_res,
max_uncomp_block_size,
total_decomp_size,
stream);
} else {
gpuinflate(
inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream);
}
break;
case compression_type::SNAPPY:
if (nvcomp_integration::is_stable_enabled()) {
if (nvcomp::is_decompression_disabled(nvcomp::compression_type::SNAPPY)) {
gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream);
} else {
nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY,
inflate_in_view,
inflate_out_view,
inflate_res,
max_uncomp_block_size,
total_decomp_size,
stream);
} else {
gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream);
}
break;
case compression_type::ZSTD:
if (auto const reason = nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD);
reason) {
CUDF_FAIL("Decompression error: " + reason.value());
}
nvcomp::batched_decompress(nvcomp::compression_type::ZSTD,
inflate_in_view,
inflate_out_view,
Expand Down Expand Up @@ -522,8 +525,8 @@ void update_null_mask(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks
parent_mask_len, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), mr);
auto merged_mask = static_cast<bitmask_type*>(merged_null_mask.data());
uint32_t* dst_idx_ptr = dst_idx.data();
// Copy child valid bits from child column to valid indexes, this will merge both child and
// parent null masks
// Copy child valid bits from child column to valid indexes, this will merge both child
// and parent null masks
thrust::for_each(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(0) + dst_idx.size(),
Expand Down
20 changes: 13 additions & 7 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1332,11 +1332,11 @@ void CompressOrcDataStreams(uint8_t* compressed_data,

if (compression == SNAPPY) {
try {
if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) {
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
gpu_snap(comp_in, comp_out, comp_res, stream);
} else {
nvcomp::batched_compress(
nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream);
} else {
gpu_snap(comp_in, comp_out, comp_res, stream);
}
} catch (...) {
// There was an error in compressing so set an error status for each block
Expand All @@ -1348,12 +1348,18 @@ void CompressOrcDataStreams(uint8_t* compressed_data,
// Since SNAPPY is the default compression (may not be explicitly requested), fall back to
// writing without compression
}
} else if (compression == ZLIB and
nvcomp::is_compression_enabled(nvcomp::compression_type::DEFLATE)) {
} else if (compression == ZLIB) {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(
nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream);
} else if (compression == ZSTD and
nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) {
} else if (compression == ZSTD) {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);
} else if (compression != NONE) {
CUDF_FAIL("Unsupported compression type");
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ constexpr size_t compression_block_size(orc::CompressionKind compression)
if (compression == orc::CompressionKind::NONE) { return 0; }

auto const ncomp_type = to_nvcomp_compression_type(compression);
auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type)
? nvcomp::compress_max_allowed_chunk_size(ncomp_type)
: std::nullopt;
auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type)
? std::nullopt
: nvcomp::compress_max_allowed_chunk_size(ncomp_type);

constexpr size_t max_block_size = 256 * 1024;
return std::min(nvcomp_limit.value_or(max_block_size), max_block_size);
Expand Down Expand Up @@ -537,7 +537,7 @@ constexpr size_t RLE_stream_size(TypeKind kind, size_t count)
auto uncomp_block_alignment(CompressionKind compression_kind)
{
if (compression_kind == NONE or
not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) {
nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) {
return 1u;
}

Expand All @@ -547,7 +547,7 @@ auto uncomp_block_alignment(CompressionKind compression_kind)
auto comp_block_alignment(CompressionKind compression_kind)
{
if (compression_kind == NONE or
not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) {
nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) {
return 1u;
}

Expand Down Expand Up @@ -2161,15 +2161,16 @@ void writer::impl::write(table_view const& table)

auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream);

auto const uncomp_block_align = uncomp_block_alignment(compression_kind_);
auto const uncompressed_block_align = uncomp_block_alignment(compression_kind_);
auto const compressed_block_align = comp_block_alignment(compression_kind_);
auto streams =
create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes));
auto enc_data = encode_columns(orc_table,
std::move(dictionaries),
std::move(dec_chunk_sizes),
segmentation,
streams,
uncomp_block_align,
uncompressed_block_align,
stream);

// Assemble individual disparate column chunks into contiguous data streams
Expand All @@ -2187,9 +2188,9 @@ void writer::impl::write(table_view const& table)
auto const max_compressed_block_size =
max_compression_output_size(compression_kind_, compression_blocksize_);
auto const padded_max_compressed_block_size =
util::round_up_unsafe<size_t>(max_compressed_block_size, uncomp_block_align);
util::round_up_unsafe<size_t>(max_compressed_block_size, compressed_block_align);
auto const padded_block_header_size =
util::round_up_unsafe<size_t>(block_header_size, uncomp_block_align);
util::round_up_unsafe<size_t>(block_header_size, compressed_block_align);
Comment on lines +2191 to +2193
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual fix for the alignment. Everything else is to safely re-enable DEFLATE compression.


auto stream_output = [&]() {
size_t max_stream_size = 0;
Expand Down Expand Up @@ -2238,7 +2239,7 @@ void writer::impl::write(table_view const& table)
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
comp_block_alignment(compression_kind_),
compressed_block_align,
strm_descs,
enc_data.streams,
comp_results,
Expand Down
25 changes: 14 additions & 11 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ auto to_nvcomp_compression_type(Compression codec)
auto page_alignment(Compression codec)
{
if (codec == Compression::UNCOMPRESSED or
not nvcomp::is_compression_enabled(to_nvcomp_compression_type(codec))) {
nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) {
return 1u;
}

Expand Down Expand Up @@ -1171,19 +1171,22 @@ void writer::impl::encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks
gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream);
switch (compression_) {
case parquet::Compression::SNAPPY:
if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) {
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
gpu_snap(comp_in, comp_out, comp_res, stream);
} else {
nvcomp::batched_compress(
nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream);
} else {
gpu_snap(comp_in, comp_out, comp_res, stream);
}
break;
case parquet::Compression::ZSTD:
if (nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) {
nvcomp::batched_compress(
nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);
case parquet::Compression::ZSTD: {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);

break;
}
case parquet::Compression::UNCOMPRESSED: break;
default: CUDF_FAIL("invalid compression type");
}
Expand Down Expand Up @@ -1245,9 +1248,9 @@ size_t max_page_bytes(Compression compression, size_t max_page_size_bytes)
if (compression == parquet::Compression::UNCOMPRESSED) { return max_page_size_bytes; }

auto const ncomp_type = to_nvcomp_compression_type(compression);
auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type)
? nvcomp::compress_max_allowed_chunk_size(ncomp_type)
: std::nullopt;
auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type)
? std::nullopt
: nvcomp::compress_max_allowed_chunk_size(ncomp_type);

return std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes);
}
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/text/bgzip_data_chunk_source.cu
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,20 @@ class bgzip_data_chunk_reader : public data_chunk_reader {
bgzip_nvcomp_transform_functor{reinterpret_cast<uint8_t const*>(d_compressed_blocks.data()),
reinterpret_cast<uint8_t*>(d_decompressed_blocks.begin())});
if (decompressed_size() > 0) {
if (cudf::io::detail::nvcomp_integration::is_all_enabled()) {
if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) {
gpuinflate(d_compressed_spans,
d_decompressed_spans,
d_decompression_results,
gzip_header_included::NO,
stream);
} else {
cudf::io::nvcomp::batched_decompress(cudf::io::nvcomp::compression_type::DEFLATE,
d_compressed_spans,
d_decompressed_spans,
d_decompression_results,
max_decompressed_size,
decompressed_size(),
stream);
} else {
gpuinflate(d_compressed_spans,
d_decompressed_spans,
d_decompression_results,
gzip_header_included::NO,
stream);
}
}
is_decompressed = true;
Expand Down
58 changes: 58 additions & 0 deletions cpp/tests/io/comp/decomp_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <io/comp/gpuinflate.hpp>
#include <io/utilities/hostdevice_vector.hpp>
#include <src/io/comp/nvcomp_adapter.hpp>

#include <cudf/utilities/default_stream.hpp>

Expand Down Expand Up @@ -118,6 +119,9 @@ struct BrotliDecompressTest : public DecompressTest<BrotliDecompressTest> {
}
};

struct NvcompConfigTest : public cudf::test::BaseFixture {
};

TEST_F(GzipDecompressTest, HelloWorld)
{
constexpr char uncompressed[] = "hello world";
Expand Down Expand Up @@ -166,4 +170,58 @@ TEST_F(BrotliDecompressTest, HelloWorld)
EXPECT_EQ(output, input);
}

TEST_F(NvcompConfigTest, Compression)
{
using cudf::io::nvcomp::compression_type;
auto const& comp_disabled = cudf::io::nvcomp::is_compression_disabled;

EXPECT_FALSE(comp_disabled(compression_type::DEFLATE, {2, 5, 0, true, true, 0}));
// version 2.5 required
EXPECT_TRUE(comp_disabled(compression_type::DEFLATE, {2, 4, 0, true, true, 0}));
// all integrations enabled required
EXPECT_TRUE(comp_disabled(compression_type::DEFLATE, {2, 5, 0, false, true, 0}));

EXPECT_FALSE(comp_disabled(compression_type::ZSTD, {2, 4, 0, true, true, 0}));
EXPECT_FALSE(comp_disabled(compression_type::ZSTD, {2, 4, 0, false, true, 0}));
// 2.4 version required
EXPECT_TRUE(comp_disabled(compression_type::ZSTD, {2, 3, 1, false, true, 0}));
// stable integrations enabled required
EXPECT_TRUE(comp_disabled(compression_type::ZSTD, {2, 4, 0, false, false, 0}));

EXPECT_FALSE(comp_disabled(compression_type::SNAPPY, {2, 5, 0, true, true, 0}));
EXPECT_FALSE(comp_disabled(compression_type::SNAPPY, {2, 4, 0, false, true, 0}));
// stable integrations enabled required
EXPECT_TRUE(comp_disabled(compression_type::SNAPPY, {2, 3, 0, false, false, 0}));
}

TEST_F(NvcompConfigTest, Decompression)
{
using cudf::io::nvcomp::compression_type;
auto const& decomp_disabled = cudf::io::nvcomp::is_decompression_disabled;

EXPECT_FALSE(decomp_disabled(compression_type::DEFLATE, {2, 5, 0, true, true, 7}));
// version 2.5 required
EXPECT_TRUE(decomp_disabled(compression_type::DEFLATE, {2, 4, 0, true, true, 7}));
// all integrations enabled required
EXPECT_TRUE(decomp_disabled(compression_type::DEFLATE, {2, 5, 0, false, true, 7}));

EXPECT_FALSE(decomp_disabled(compression_type::ZSTD, {2, 4, 0, true, true, 7}));
EXPECT_FALSE(decomp_disabled(compression_type::ZSTD, {2, 3, 2, false, true, 6}));
EXPECT_FALSE(decomp_disabled(compression_type::ZSTD, {2, 3, 0, true, true, 6}));
// 2.3.1 and earlier requires all integrations to be enabled
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 3, 1, false, true, 7}));
// 2.3 version required
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 2, 0, true, true, 7}));
// stable integrations enabled required
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 4, 0, false, false, 7}));
// 2.4.0 disabled on Pascal
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 4, 0, true, true, 6}));

EXPECT_FALSE(decomp_disabled(compression_type::SNAPPY, {2, 4, 0, true, true, 7}));
EXPECT_FALSE(decomp_disabled(compression_type::SNAPPY, {2, 3, 0, false, true, 7}));
EXPECT_FALSE(decomp_disabled(compression_type::SNAPPY, {2, 2, 0, false, true, 7}));
// stable integrations enabled required
EXPECT_TRUE(decomp_disabled(compression_type::SNAPPY, {2, 2, 0, false, false, 7}));
jbrennan333 marked this conversation as resolved.
Show resolved Hide resolved
}

CUDF_TEST_PROGRAM_MAIN()
Loading