From d8feee1f0790b58b1fcac8f56b4d30a5a6675e42 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Tue, 20 Sep 2022 19:14:29 +0000 Subject: [PATCH 1/7] bgzip refactoring and benchmark --- cpp/benchmarks/CMakeLists.txt | 3 +- cpp/benchmarks/io/text/multibyte_split.cpp | 36 +++- cpp/src/io/text/bgzip_data_chunk_source.cu | 72 +------ cpp/src/io/text/bgzip_utils.hpp | 189 ++++++++++++++++++ cpp/tests/CMakeLists.txt | 1 + cpp/tests/io/text/data_chunk_source_test.cpp | 193 ++++++++++--------- 6 files changed, 329 insertions(+), 165 deletions(-) create mode 100644 cpp/src/io/text/bgzip_utils.hpp diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index d1ff177a25e..f35d0b0b49e 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -301,7 +301,8 @@ ConfigureNVBench(NESTED_JSON_NVBENCH io/json/nested_json.cpp) # ################################################################################################## # * io benchmark --------------------------------------------------------------------- -ConfigureNVBench(MULTIBYTE_SPLIT_BENCHMARK io/text/multibyte_split.cpp) +ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp) +target_link_libraries(MULTIBYTE_SPLIT_NVBENCH PRIVATE ZLIB::ZLIB) add_custom_target( run_benchmarks diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index 4865d11ae8b..3a2c1abfdb6 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "io/text/bgzip_utils.hpp" + #include #include #include @@ -40,10 +42,11 @@ #include #include #include +#include temp_directory const temp_dir("cudf_nvbench"); -enum class data_chunk_source_type { device, file, host, host_pinned }; +enum class data_chunk_source_type { device, file, host, host_pinned, file_bgzip }; static cudf::string_scalar create_random_input(int32_t num_chars, double delim_factor, @@ -78,6 +81,23 @@ static cudf::string_scalar create_random_input(int32_t num_chars, return cudf::string_scalar(std::move(*chars_buffer)); } +static void write_bgzip_file(cudf::host_span host_data, std::ostream& stream) +{ + // a bit of variability with a decent amount of padding so we don't overflow 16 bit block sizes + std::uniform_int_distribution chunk_size_dist{64000, 65000}; + std::default_random_engine rng{}; + std::size_t pos = 0; + while (pos < host_data.size()) { + auto const remainder = host_data.size() - pos; + auto const chunk_size = std::min(remainder, chunk_size_dist(rng)); + cudf::io::text::detail::bgzip::write_compressed_block(stream, + {host_data.data() + pos, chunk_size}); + pos += chunk_size; + } + // empty block denotes EOF + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); +} + static void bench_multibyte_split(nvbench::state& state) { cudf::rmm_pool_raii pool_raii; @@ -104,7 +124,8 @@ static void bench_multibyte_split(nvbench::state& state) auto host_pinned_input = thrust::host_vector>{}; - if (source_type == data_chunk_source_type::host || source_type == data_chunk_source_type::file) { + if (source_type == data_chunk_source_type::host || source_type == data_chunk_source_type::file || + source_type == data_chunk_source_type::file_bgzip) { host_input = cudf::detail::make_std_vector_sync( {device_input.data(), static_cast(device_input.size())}, cudf::default_stream_value); @@ -131,6 +152,14 @@ static void bench_multibyte_split(nvbench::state& state) return cudf::io::text::make_source(host_pinned_input); case data_chunk_source_type::device: // return cudf::io::text::make_source(device_input); + case data_chunk_source_type::file_bgzip: { + auto const temp_file_name = random_file_in_dir(temp_dir.path()); + { + std::ofstream stream(temp_file_name, std::ofstream::out); + write_bgzip_file(host_input, stream); + } + return cudf::io::text::make_source_from_bgzip_file(temp_file_name); + } default: CUDF_FAIL(); } }(); @@ -158,7 +187,8 @@ NVBENCH_BENCH(bench_multibyte_split) {static_cast(data_chunk_source_type::device), static_cast(data_chunk_source_type::file), static_cast(data_chunk_source_type::host), - static_cast(data_chunk_source_type::host_pinned)}) + static_cast(data_chunk_source_type::host_pinned), + static_cast(data_chunk_source_type::file_bgzip)}) .add_int64_axis("delim_size", {1, 4, 7}) .add_int64_axis("delim_percent", {1, 25}) .add_int64_power_of_two_axis("size_approx", {15, 30}) diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index 7715c2ca7e1..ab1f4287b8a 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -15,6 +15,7 @@ */ #include "io/comp/nvcomp_adapter.hpp" +#include "io/text/bgzip_utils.hpp" #include "io/text/device_data_chunks.hpp" #include "io/utilities/config_utils.hpp" @@ -36,7 +37,6 @@ #include namespace cudf::io::text { - namespace { /** @@ -64,68 +64,6 @@ struct bgzip_nvcomp_transform_functor { class bgzip_data_chunk_reader : public data_chunk_reader { private: - template - static IntType read_int(char* data) - { - IntType result{}; - // we assume little-endian - std::memcpy(&result, &data[0], sizeof(result)); - return result; - } - - struct bgzip_header { - int block_size; - int extra_length; - [[nodiscard]] int data_size() const { return block_size - extra_length - 20; } - }; - - bgzip_header read_header() - { - std::array buffer{}; - _data_stream->read(buffer.data(), sizeof(buffer)); - std::array const expected_header{{31, 139, 8, 4}}; - CUDF_EXPECTS( - std::equal( - expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), - "malformed BGZIP header"); - // we ignore the remaining bytes of the fixed header, since they don't matter to us - auto const extra_length = read_int(&buffer[10]); - uint16_t extra_offset{}; - // read all the extra subfields - while (extra_offset < extra_length) { - auto const remaining_size = extra_length - extra_offset; - CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); - // a subfield consists of 2 identifier bytes and a uint16 length - // 66/67 identifies a BGZIP block size field, we skip all other fields - _data_stream->read(buffer.data(), 4); - extra_offset += 4; - auto const subfield_size = read_int(&buffer[2]); - if (buffer[0] == 66 && buffer[1] == 67) { - // the block size subfield contains a single uint16 value, which is block_size - 1 - CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); - _data_stream->read(buffer.data(), sizeof(uint16_t)); - _data_stream->seekg(remaining_size - 6, std::ios_base::cur); - auto const block_size_minus_one = read_int(&buffer[0]); - return {block_size_minus_one + 1, extra_length}; - } else { - _data_stream->seekg(subfield_size, std::ios_base::cur); - extra_offset += subfield_size; - } - } - CUDF_FAIL("missing BGZIP size extra subfield"); - } - - struct bgzip_footer { - uint32_t decompressed_size; - }; - - bgzip_footer read_footer() - { - std::array buffer{}; - _data_stream->read(buffer.data(), sizeof(buffer)); - return {read_int(&buffer[4])}; - } - template using pinned_host_vector = thrust::host_vector>; @@ -258,13 +196,13 @@ class bgzip_data_chunk_reader : public data_chunk_reader { return available_decompressed_size - read_pos; } - void read_block(bgzip_header header, std::istream& stream) + void read_block(detail::bgzip::header header, std::istream& stream) { h_compressed_blocks.resize(h_compressed_blocks.size() + header.data_size()); stream.read(h_compressed_blocks.data() + compressed_size(), header.data_size()); } - void add_block_offsets(bgzip_header header, bgzip_footer footer) + void add_block_offsets(detail::bgzip::header header, detail::bgzip::footer footer) { max_decompressed_size = std::max(footer.decompressed_size, max_decompressed_size); @@ -294,9 +232,9 @@ class bgzip_data_chunk_reader : public data_chunk_reader { // peek is necessary if we are already at the end, but didn't try to read another byte _data_stream->peek(); if (_data_stream->eof() || _compressed_pos > _compressed_end) { break; } - auto header = read_header(); + auto header = detail::bgzip::read_header(*_data_stream); _curr_blocks.read_block(header, *_data_stream); - auto footer = read_footer(); + auto footer = detail::bgzip::read_footer(*_data_stream); _curr_blocks.add_block_offsets(header, footer); // for the last GZIP block, we restrict ourselves to the bytes up to _local_end // but only for the reader, not for decompression! diff --git a/cpp/src/io/text/bgzip_utils.hpp b/cpp/src/io/text/bgzip_utils.hpp new file mode 100644 index 00000000000..54c2dab9ac2 --- /dev/null +++ b/cpp/src/io/text/bgzip_utils.hpp @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include + +#include + +namespace cudf::io::text::detail::bgzip { + +template +static IntType read_int(char* data) +{ + IntType result{}; + // we assume little-endian + std::memcpy(&result, &data[0], sizeof(result)); + return result; +} + +template +void write_int(std::ostream& stream, T val) +{ + std::array bytes; + // we assume little-endian + std::memcpy(&bytes[0], &val, sizeof(T)); + stream.write(bytes.data(), bytes.size()); +} + +struct header { + int block_size; + int extra_length; + [[nodiscard]] int data_size() const { return block_size - extra_length - 20; } +}; + +header read_header(std::istream& stream) +{ + std::array buffer{}; + stream.read(buffer.data(), sizeof(buffer)); + std::array const expected_header{{31, 139, 8, 4}}; + CUDF_EXPECTS( + std::equal( + expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), + "malformed BGZIP header"); + // we ignore the remaining bytes of the fixed header, since they don't matter to us + auto const extra_length = read_int(&buffer[10]); + uint16_t extra_offset{}; + // read all the extra subfields + while (extra_offset < extra_length) { + auto const remaining_size = extra_length - extra_offset; + CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); + // a subfield consists of 2 identifier bytes and a uint16 length + // 66/67 identifies a BGZIP block size field, we skip all other fields + stream.read(buffer.data(), 4); + extra_offset += 4; + auto const subfield_size = read_int(&buffer[2]); + if (buffer[0] == 66 && buffer[1] == 67) { + // the block size subfield contains a single uint16 value, which is block_size - 1 + CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); + stream.read(buffer.data(), sizeof(uint16_t)); + stream.seekg(remaining_size - 6, std::ios_base::cur); + auto const block_size_minus_one = read_int(&buffer[0]); + return {block_size_minus_one + 1, extra_length}; + } else { + stream.seekg(subfield_size, std::ios_base::cur); + extra_offset += subfield_size; + } + } + CUDF_FAIL("missing BGZIP size extra subfield"); +} + +struct footer { + uint32_t crc; + uint32_t decompressed_size; +}; + +footer read_footer(std::istream& stream) +{ + std::array buffer{}; + stream.read(buffer.data(), sizeof(buffer)); + return {read_int(&buffer[0]), read_int(&buffer[4])}; +} + +void write_footer(std::ostream& stream, host_span data) +{ + // compute crc32 with zlib, this allows checking the generated files with external tools + write_int(stream, crc32(0, (unsigned char*)data.data(), data.size())); + write_int(stream, data.size()); +} + +void write_header(std::ostream& stream, + uint16_t compressed_size, + host_span pre_size_subfield, + host_span post_size_subfield) +{ + auto uint8_to_char = [](uint8_t val) { + char c{}; + std::memcpy(&c, &val, 1); + return c; + }; + std::array const header_data{{ + 31, // magic number + uint8_to_char(139), // magic number + 8, // compression type: deflate + 4, // flags: extra header + 0, // mtime + 0, // mtime + 0, // mtime + 0, // mtime: irrelevant + 4, // xfl: irrelevant + 3 // OS: irrelevant + }}; + stream.write(header_data.data(), header_data.size()); + std::array extra_blocklen_field{{66, 67, 2, 0}}; + auto const extra_size = pre_size_subfield.size() + extra_blocklen_field.size() + + sizeof(uint16_t) + post_size_subfield.size(); + auto const block_size = + header_data.size() + sizeof(uint16_t) + extra_size + compressed_size + 2 * sizeof(uint32_t); + write_int(stream, extra_size); + stream.write(pre_size_subfield.data(), pre_size_subfield.size()); + stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); + CUDF_EXPECTS(block_size - 1 <= std::numeric_limits::max(), "block size overflow"); + write_int(stream, block_size - 1); + stream.write(post_size_subfield.data(), post_size_subfield.size()); +} + +void write_uncompressed_block(std::ostream& stream, + host_span data, + host_span extra_garbage_before = {}, + host_span extra_garbage_after = {}) +{ + CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); + write_header(stream, data.size() + 5, extra_garbage_before, extra_garbage_after); + write_int(stream, 1); + write_int(stream, data.size()); + write_int(stream, ~static_cast(data.size())); + stream.write(data.data(), data.size()); + write_footer(stream, data); +} + +void write_compressed_block(std::ostream& stream, + host_span data, + host_span extra_garbage_before = {}, + host_span extra_garbage_after = {}) +{ + CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); + z_stream deflate_stream{}; + // let's make sure we have enough space to store the data + std::vector compressed_out(data.size() * 2 + 256); + deflate_stream.next_in = (unsigned char*)data.data(); + deflate_stream.avail_in = data.size(); + deflate_stream.next_out = (unsigned char*)compressed_out.data(); + deflate_stream.avail_out = compressed_out.size(); + CUDF_EXPECTS( + deflateInit2(&deflate_stream, // stream + Z_DEFAULT_COMPRESSION, // compression level + Z_DEFLATED, // method + -15, // log2 of window size (negative value means no ZLIB header/footer) + 9, // mem level: best performance/most memory usage for compression + Z_DEFAULT_STRATEGY // strategy + ) == Z_OK, + "deflateInit failed"); + CUDF_EXPECTS(deflate(&deflate_stream, Z_FINISH) == Z_STREAM_END, "deflate failed"); + CUDF_EXPECTS(deflateEnd(&deflate_stream) == Z_OK, "deflateEnd failed"); + write_header(stream, deflate_stream.total_out, extra_garbage_before, extra_garbage_after); + stream.write(compressed_out.data(), deflate_stream.total_out); + write_footer(stream, data); +} + +} // namespace cudf::io::text::detail::bgzip diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e630e842f4e..8675dc891c1 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -230,6 +230,7 @@ ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) ConfigureTest(DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp) +target_link_libraries(DATA_CHUNK_SOURCE_TEST PRIVATE ZLIB::ZLIB) ConfigureTest(LOGICAL_STACK_TEST io/fst/logical_stack_test.cu) ConfigureTest(FST_TEST io/fst/fst_test.cu) ConfigureTest(TYPE_INFERENCE_TEST io/type_inference_test.cu) diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index 115a66cdd95..e2f099558fa 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -17,6 +17,8 @@ #include #include +#include "io/text/bgzip_utils.hpp" + #include #include @@ -125,102 +127,61 @@ TEST_F(DataChunkSourceTest, Host) test_source(content, *source); } -template -void write_int(std::ostream& stream, T val) -{ - std::array bytes; - // we assume little-endian - std::memcpy(&bytes[0], &val, sizeof(T)); - stream.write(bytes.data(), bytes.size()); -} - -void write_bgzip_block(std::ostream& stream, - const std::string& data, - bool add_extra_garbage_before, - bool add_extra_garbage_after) -{ - std::array const header{{ - 31, // magic number - 139, // magic number - 8, // compression type: deflate - 4, // flags: extra header - 0, // mtime - 0, // mtime - 0, // mtime - 0, // mtime: irrelevant - 4, // xfl: irrelevant - 3 // OS: irrelevant - }}; - std::array const extra_blocklen_field{{66, 67, 2, 0}}; - std::array const extra_garbage_field1{{13, // magic number - 37, // magic number - 7, // field length - 0, // field length - 1, - 2, - 3, - 4, - 5, - 6, - 7}}; - std::array const extra_garbage_field2{{12, // magic number - 34, // magic number - 2, // field length - 0, // field length - 1, 2, - 56, // magic number - 78, // magic number - 1, // field length - 0, // field length - 3, // - 90, // magic number - 12, // magic number - 8, // field length - 0, // field length - 1, 2, 3, 4, 5, 6, 7, 8}}; - stream.write(reinterpret_cast(header.data()), header.size()); - uint16_t extra_size = extra_blocklen_field.size() + 2; - if (add_extra_garbage_before) { extra_size += extra_garbage_field1.size(); } - if (add_extra_garbage_after) { extra_size += extra_garbage_field2.size(); } - write_int(stream, extra_size); - if (add_extra_garbage_before) { - stream.write(extra_garbage_field1.data(), extra_garbage_field1.size()); - } - stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); - auto const compressed_size = data.size() + 5; - uint16_t const block_size_minus_one = compressed_size + 19 + extra_size; - write_int(stream, block_size_minus_one); - if (add_extra_garbage_after) { - stream.write(extra_garbage_field2.data(), extra_garbage_field2.size()); - } - write_int(stream, 1); - write_int(stream, data.size()); - write_int(stream, ~static_cast(data.size())); - stream.write(data.data(), data.size()); - // this does not produce a valid file, since we write 0 as the CRC - // the parser ignores the checksum, so it doesn't matter to the test - // to check output with gzip, plug in the CRC of `data` here. - write_int(stream, 0); - write_int(stream, data.size()); -} - void write_bgzip(std::ostream& stream, const std::string& data, std::default_random_engine& rng, - bool write_eof = true) + bool compress, + bool write_eof) { + std::vector const extra_garbage_field1{{13, // magic number + 37, // magic number + 7, // field length + 0, // field length + 1, + 2, + 3, + 4, + 5, + 6, + 7}}; + std::vector const extra_garbage_field2{{12, // magic number + 34, // magic number + 2, // field length + 0, // field length + 1, 2, + 56, // magic number + 78, // magic number + 1, // field length + 0, // field length + 3, // + 90, // magic number + 12, // magic number + 8, // field length + 0, // field length + 1, 2, 3, 4, 5, 6, 7, 8}}; // make sure the block size with header stays below 65536 std::uniform_int_distribution block_size_dist{1, 65000}; - auto begin = data.begin(); - auto const end = data.end(); + auto begin = data.data(); + auto const end = data.data() + data.size(); int i = 0; while (begin < end) { + using cudf::host_span; auto len = std::min(end - begin, block_size_dist(rng)); - write_bgzip_block(stream, std::string{begin, begin + len}, i & 1, i & 2); + host_span const garbage_before = + i & 1 ? extra_garbage_field1 : host_span{}; + host_span const garbage_after = + i & 2 ? extra_garbage_field2 : host_span{}; + if (compress) { + cudf::io::text::detail::bgzip::write_compressed_block( + stream, {begin, len}, garbage_before, garbage_after); + } else { + cudf::io::text::detail::bgzip::write_uncompressed_block( + stream, {begin, len}, garbage_before, garbage_after); + } begin += len; i++; } - if (write_eof) { write_bgzip_block(stream, {}, false, false); } + if (write_eof) { cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); } } TEST_F(DataChunkSourceTest, BgzipSource) @@ -233,7 +194,7 @@ TEST_F(DataChunkSourceTest, BgzipSource) { std::ofstream stream{filename}; std::default_random_engine rng{}; - write_bgzip(stream, input, rng); + write_bgzip(stream, input, rng, false, true); } auto const source = cudf::io::text::make_source_from_bgzip_file(filename); @@ -264,11 +225,12 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) stream.write(padding_garbage.data(), padding_garbage.size()); std::default_random_engine rng{}; begin_compressed_offset = stream.tellp(); - write_bgzip_block(stream, data_garbage + begininput, false, false); - write_bgzip(stream, input, rng, false); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, data_garbage + begininput); + write_bgzip(stream, input, rng, false, false); end_compressed_offset = stream.tellp(); - write_bgzip_block(stream, endinput + data_garbage + data_garbage, false, false); - write_bgzip_block(stream, {}, false, false); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + endinput + data_garbage + data_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); stream.write(padding_garbage.data(), padding_garbage.size()); } input = begininput + input + endinput; @@ -293,8 +255,9 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) std::size_t const end_local_offset{head_garbage.size() + input.size()}; { std::ofstream stream{filename}; - write_bgzip_block(stream, head_garbage + input + tail_garbage, false, false); - write_bgzip_block(stream, {}, false, false); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + head_garbage + input + tail_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); } auto const source = @@ -317,10 +280,12 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) std::size_t const end_local_offset{input.size() - 10}; { std::ofstream stream{filename}; - write_bgzip_block(stream, head_garbage + input.substr(0, 10), false, false); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + head_garbage + input.substr(0, 10)); end_compressed_offset = stream.tellp(); - write_bgzip_block(stream, input.substr(10) + tail_garbage, false, false); - write_bgzip_block(stream, {}, false, false); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + input.substr(10) + tail_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); } auto const source = @@ -331,4 +296,44 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) test_source(input, *source); } +TEST_F(DataChunkSourceTest, BgzipCompressedSourceVirtualOffsets) +{ + auto const filename = temp_env->get_temp_filepath("bgzip_source_compressed_offsets"); + std::string input{"bananarama"}; + for (int i = 0; i < 24; i++) { + input = input + input; + } + std::string padding_garbage{"garbage"}; + for (int i = 0; i < 10; i++) { + padding_garbage = padding_garbage + padding_garbage; + } + std::string const data_garbage{"GARBAGE"}; + std::string const begininput{"begin of bananarama"}; + std::string const endinput{"end of bananarama"}; + std::size_t begin_compressed_offset{}; + std::size_t end_compressed_offset{}; + std::size_t const begin_local_offset{data_garbage.size()}; + std::size_t const end_local_offset{endinput.size()}; + { + std::ofstream stream{filename}; + stream.write(padding_garbage.data(), padding_garbage.size()); + std::default_random_engine rng{}; + begin_compressed_offset = stream.tellp(); + cudf::io::text::detail::bgzip::write_compressed_block(stream, data_garbage + begininput); + write_bgzip(stream, input, rng, true, false); + end_compressed_offset = stream.tellp(); + cudf::io::text::detail::bgzip::write_compressed_block(stream, + endinput + data_garbage + data_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); + stream.write(padding_garbage.data(), padding_garbage.size()); + } + input = begininput + input + endinput; + + auto source = + cudf::io::text::make_source_from_bgzip_file(filename, + begin_compressed_offset << 16 | begin_local_offset, + end_compressed_offset << 16 | end_local_offset); + test_source(input, *source); +} + CUDF_TEST_PROGRAM_MAIN() From 848fc5e902226368882e7d80e33f8f58f0a0f24c Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Wed, 28 Sep 2022 09:14:01 +0000 Subject: [PATCH 2/7] improve naming --- cpp/src/io/text/bgzip_utils.hpp | 62 ++++++++--------- cpp/tests/io/text/data_chunk_source_test.cpp | 71 ++++++++++---------- 2 files changed, 67 insertions(+), 66 deletions(-) diff --git a/cpp/src/io/text/bgzip_utils.hpp b/cpp/src/io/text/bgzip_utils.hpp index 54c2dab9ac2..fa2c617416e 100644 --- a/cpp/src/io/text/bgzip_utils.hpp +++ b/cpp/src/io/text/bgzip_utils.hpp @@ -38,12 +38,12 @@ static IntType read_int(char* data) } template -void write_int(std::ostream& stream, T val) +void write_int(std::ostream& output_stream, T val) { std::array bytes; // we assume little-endian std::memcpy(&bytes[0], &val, sizeof(T)); - stream.write(bytes.data(), bytes.size()); + output_stream.write(bytes.data(), bytes.size()); } struct header { @@ -52,10 +52,10 @@ struct header { [[nodiscard]] int data_size() const { return block_size - extra_length - 20; } }; -header read_header(std::istream& stream) +header read_header(std::istream& input_stream) { std::array buffer{}; - stream.read(buffer.data(), sizeof(buffer)); + input_stream.read(buffer.data(), sizeof(buffer)); std::array const expected_header{{31, 139, 8, 4}}; CUDF_EXPECTS( std::equal( @@ -70,18 +70,18 @@ header read_header(std::istream& stream) CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); // a subfield consists of 2 identifier bytes and a uint16 length // 66/67 identifies a BGZIP block size field, we skip all other fields - stream.read(buffer.data(), 4); + input_stream.read(buffer.data(), 4); extra_offset += 4; auto const subfield_size = read_int(&buffer[2]); if (buffer[0] == 66 && buffer[1] == 67) { // the block size subfield contains a single uint16 value, which is block_size - 1 CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); - stream.read(buffer.data(), sizeof(uint16_t)); - stream.seekg(remaining_size - 6, std::ios_base::cur); + input_stream.read(buffer.data(), sizeof(uint16_t)); + input_stream.seekg(remaining_size - 6, std::ios_base::cur); auto const block_size_minus_one = read_int(&buffer[0]); return {block_size_minus_one + 1, extra_length}; } else { - stream.seekg(subfield_size, std::ios_base::cur); + input_stream.seekg(subfield_size, std::ios_base::cur); extra_offset += subfield_size; } } @@ -93,21 +93,21 @@ struct footer { uint32_t decompressed_size; }; -footer read_footer(std::istream& stream) +footer read_footer(std::istream& input_stream) { std::array buffer{}; - stream.read(buffer.data(), sizeof(buffer)); + input_stream.read(buffer.data(), sizeof(buffer)); return {read_int(&buffer[0]), read_int(&buffer[4])}; } -void write_footer(std::ostream& stream, host_span data) +void write_footer(std::ostream& output_stream, host_span data) { // compute crc32 with zlib, this allows checking the generated files with external tools - write_int(stream, crc32(0, (unsigned char*)data.data(), data.size())); - write_int(stream, data.size()); + write_int(output_stream, crc32(0, (unsigned char*)data.data(), data.size())); + write_int(output_stream, data.size()); } -void write_header(std::ostream& stream, +void write_header(std::ostream& output_stream, uint16_t compressed_size, host_span pre_size_subfield, host_span post_size_subfield) @@ -129,35 +129,35 @@ void write_header(std::ostream& stream, 4, // xfl: irrelevant 3 // OS: irrelevant }}; - stream.write(header_data.data(), header_data.size()); + output_stream.write(header_data.data(), header_data.size()); std::array extra_blocklen_field{{66, 67, 2, 0}}; auto const extra_size = pre_size_subfield.size() + extra_blocklen_field.size() + sizeof(uint16_t) + post_size_subfield.size(); auto const block_size = header_data.size() + sizeof(uint16_t) + extra_size + compressed_size + 2 * sizeof(uint32_t); - write_int(stream, extra_size); - stream.write(pre_size_subfield.data(), pre_size_subfield.size()); - stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); + write_int(output_stream, extra_size); + output_stream.write(pre_size_subfield.data(), pre_size_subfield.size()); + output_stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); CUDF_EXPECTS(block_size - 1 <= std::numeric_limits::max(), "block size overflow"); - write_int(stream, block_size - 1); - stream.write(post_size_subfield.data(), post_size_subfield.size()); + write_int(output_stream, block_size - 1); + output_stream.write(post_size_subfield.data(), post_size_subfield.size()); } -void write_uncompressed_block(std::ostream& stream, +void write_uncompressed_block(std::ostream& output_stream, host_span data, host_span extra_garbage_before = {}, host_span extra_garbage_after = {}) { CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); - write_header(stream, data.size() + 5, extra_garbage_before, extra_garbage_after); - write_int(stream, 1); - write_int(stream, data.size()); - write_int(stream, ~static_cast(data.size())); - stream.write(data.data(), data.size()); - write_footer(stream, data); + write_header(output_stream, data.size() + 5, extra_garbage_before, extra_garbage_after); + write_int(output_stream, 1); + write_int(output_stream, data.size()); + write_int(output_stream, ~static_cast(data.size())); + output_stream.write(data.data(), data.size()); + write_footer(output_stream, data); } -void write_compressed_block(std::ostream& stream, +void write_compressed_block(std::ostream& output_stream, host_span data, host_span extra_garbage_before = {}, host_span extra_garbage_after = {}) @@ -181,9 +181,9 @@ void write_compressed_block(std::ostream& stream, "deflateInit failed"); CUDF_EXPECTS(deflate(&deflate_stream, Z_FINISH) == Z_STREAM_END, "deflate failed"); CUDF_EXPECTS(deflateEnd(&deflate_stream) == Z_OK, "deflateEnd failed"); - write_header(stream, deflate_stream.total_out, extra_garbage_before, extra_garbage_after); - stream.write(compressed_out.data(), deflate_stream.total_out); - write_footer(stream, data); + write_header(output_stream, deflate_stream.total_out, extra_garbage_before, extra_garbage_after); + output_stream.write(compressed_out.data(), deflate_stream.total_out); + write_footer(output_stream, data); } } // namespace cudf::io::text::detail::bgzip diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index e2f099558fa..9593cdaa31b 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -127,7 +127,7 @@ TEST_F(DataChunkSourceTest, Host) test_source(content, *source); } -void write_bgzip(std::ostream& stream, +void write_bgzip(std::ostream& output_stream, const std::string& data, std::default_random_engine& rng, bool compress, @@ -173,15 +173,15 @@ void write_bgzip(std::ostream& stream, i & 2 ? extra_garbage_field2 : host_span{}; if (compress) { cudf::io::text::detail::bgzip::write_compressed_block( - stream, {begin, len}, garbage_before, garbage_after); + output_stream, {begin, len}, garbage_before, garbage_after); } else { cudf::io::text::detail::bgzip::write_uncompressed_block( - stream, {begin, len}, garbage_before, garbage_after); + output_stream, {begin, len}, garbage_before, garbage_after); } begin += len; i++; } - if (write_eof) { cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); } + if (write_eof) { cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } } TEST_F(DataChunkSourceTest, BgzipSource) @@ -192,9 +192,9 @@ TEST_F(DataChunkSourceTest, BgzipSource) input = input + input; } { - std::ofstream stream{filename}; + std::ofstream output_stream{filename}; std::default_random_engine rng{}; - write_bgzip(stream, input, rng, false, true); + write_bgzip(output_stream, input, rng, false, true); } auto const source = cudf::io::text::make_source_from_bgzip_file(filename); @@ -204,7 +204,7 @@ TEST_F(DataChunkSourceTest, BgzipSource) TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) { - auto const filename = temp_env->get_temp_filepath("bgzip_source"); + auto const filename = temp_env->get_temp_filepath("bgzip_source_offsets"); std::string input{"bananarama"}; for (int i = 0; i < 24; i++) { input = input + input; @@ -221,17 +221,18 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) std::size_t const begin_local_offset{data_garbage.size()}; std::size_t const end_local_offset{endinput.size()}; { - std::ofstream stream{filename}; - stream.write(padding_garbage.data(), padding_garbage.size()); + std::ofstream output_stream{filename}; + output_stream.write(padding_garbage.data(), padding_garbage.size()); std::default_random_engine rng{}; - begin_compressed_offset = stream.tellp(); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, data_garbage + begininput); - write_bgzip(stream, input, rng, false, false); - end_compressed_offset = stream.tellp(); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + begin_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, + data_garbage + begininput); + write_bgzip(output_stream, input, rng, false, false); + end_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, endinput + data_garbage + data_garbage); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); - stream.write(padding_garbage.data(), padding_garbage.size()); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); + output_stream.write(padding_garbage.data(), padding_garbage.size()); } input = begininput + input + endinput; @@ -245,7 +246,7 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) { - auto const filename = temp_env->get_temp_filepath("bgzip_source"); + auto const filename = temp_env->get_temp_filepath("bgzip_source_offsets_single_block"); std::string const input{"collection unit brings"}; std::string const head_garbage{"garbage"}; std::string const tail_garbage{"GARBAGE"}; @@ -254,10 +255,10 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) std::size_t const begin_local_offset{head_garbage.size()}; std::size_t const end_local_offset{head_garbage.size() + input.size()}; { - std::ofstream stream{filename}; - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + std::ofstream output_stream{filename}; + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, head_garbage + input + tail_garbage); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } auto const source = @@ -270,7 +271,7 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) { - auto const filename = temp_env->get_temp_filepath("bgzip_source"); + auto const filename = temp_env->get_temp_filepath("bgzip_source_offsets_single_chunk"); std::string const input{"collection unit brings"}; std::string const head_garbage{"garbage"}; std::string const tail_garbage{"GARBAGE"}; @@ -279,13 +280,13 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) std::size_t const begin_local_offset{head_garbage.size()}; std::size_t const end_local_offset{input.size() - 10}; { - std::ofstream stream{filename}; - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + std::ofstream output_stream{filename}; + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, head_garbage + input.substr(0, 10)); - end_compressed_offset = stream.tellp(); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, + end_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, input.substr(10) + tail_garbage); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } auto const source = @@ -315,17 +316,17 @@ TEST_F(DataChunkSourceTest, BgzipCompressedSourceVirtualOffsets) std::size_t const begin_local_offset{data_garbage.size()}; std::size_t const end_local_offset{endinput.size()}; { - std::ofstream stream{filename}; - stream.write(padding_garbage.data(), padding_garbage.size()); + std::ofstream output_stream{filename}; + output_stream.write(padding_garbage.data(), padding_garbage.size()); std::default_random_engine rng{}; - begin_compressed_offset = stream.tellp(); - cudf::io::text::detail::bgzip::write_compressed_block(stream, data_garbage + begininput); - write_bgzip(stream, input, rng, true, false); - end_compressed_offset = stream.tellp(); - cudf::io::text::detail::bgzip::write_compressed_block(stream, + begin_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_compressed_block(output_stream, data_garbage + begininput); + write_bgzip(output_stream, input, rng, true, false); + end_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_compressed_block(output_stream, endinput + data_garbage + data_garbage); - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); - stream.write(padding_garbage.data(), padding_garbage.size()); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); + output_stream.write(padding_garbage.data(), padding_garbage.size()); } input = begininput + input + endinput; From b707e1fe54abd4336aec39d9e5d28f7d043ba5e8 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Tue, 4 Oct 2022 09:17:27 +0000 Subject: [PATCH 3/7] move code to source --- cpp/CMakeLists.txt | 1 + cpp/src/io/text/bgzip_utils.cpp | 181 ++++++++++++++++++++++++++++++ cpp/src/io/text/bgzip_utils.hpp | 191 ++++++++++---------------------- 3 files changed, 239 insertions(+), 134 deletions(-) create mode 100644 cpp/src/io/text/bgzip_utils.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 96fc75adcff..68d4222ac9c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -356,6 +356,7 @@ add_library( src/io/text/byte_range_info.cpp src/io/text/data_chunk_source_factories.cpp src/io/text/bgzip_data_chunk_source.cu + src/io/text/bgzip_utils.cpp src/io/text/multibyte_split.cu src/io/utilities/column_buffer.cpp src/io/utilities/config_utils.cpp diff --git a/cpp/src/io/text/bgzip_utils.cpp b/cpp/src/io/text/bgzip_utils.cpp new file mode 100644 index 00000000000..bef60119369 --- /dev/null +++ b/cpp/src/io/text/bgzip_utils.cpp @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io/text/bgzip_utils.hpp" + +#include +#include + +#include +#include +#include +#include + +#include + +namespace cudf::io::text::detail::bgzip { +namespace { + +template +IntType read_int(char* data) +{ + IntType result{}; + // we assume little-endian + std::memcpy(&result, &data[0], sizeof(result)); + return result; +} + +template +void write_int(std::ostream& output_stream, T val) +{ + std::array bytes; + // we assume little-endian + std::memcpy(&bytes[0], &val, sizeof(T)); + output_stream.write(bytes.data(), bytes.size()); +} + +} // namespace + +header read_header(std::istream& input_stream) +{ + std::array buffer{}; + input_stream.read(buffer.data(), sizeof(buffer)); + std::array const expected_header{{31, 139, 8, 4}}; + CUDF_EXPECTS( + std::equal( + expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), + "malformed BGZIP header"); + // we ignore the remaining bytes of the fixed header, since they don't matter to us + auto const extra_length = read_int(&buffer[10]); + uint16_t extra_offset{}; + // read all the extra subfields + while (extra_offset < extra_length) { + auto const remaining_size = extra_length - extra_offset; + CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); + // a subfield consists of 2 identifier bytes and a uint16 length + // 66/67 identifies a BGZIP block size field, we skip all other fields + input_stream.read(buffer.data(), 4); + extra_offset += 4; + auto const subfield_size = read_int(&buffer[2]); + if (buffer[0] == 66 && buffer[1] == 67) { + // the block size subfield contains a single uint16 value, which is block_size - 1 + CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); + input_stream.read(buffer.data(), sizeof(uint16_t)); + input_stream.seekg(remaining_size - 6, std::ios_base::cur); + auto const block_size_minus_one = read_int(&buffer[0]); + return {block_size_minus_one + 1, extra_length}; + } else { + input_stream.seekg(subfield_size, std::ios_base::cur); + extra_offset += subfield_size; + } + } + CUDF_FAIL("missing BGZIP size extra subfield"); +} + +footer read_footer(std::istream& input_stream) +{ + std::array buffer{}; + input_stream.read(buffer.data(), sizeof(buffer)); + return {read_int(&buffer[0]), read_int(&buffer[4])}; +} + +void write_footer(std::ostream& output_stream, host_span data) +{ + // compute crc32 with zlib, this allows checking the generated files with external tools + write_int(output_stream, crc32(0, (unsigned char*)data.data(), data.size())); + write_int(output_stream, data.size()); +} + +void write_header(std::ostream& output_stream, + uint16_t compressed_size, + host_span pre_size_subfield, + host_span post_size_subfield) +{ + auto uint8_to_char = [](uint8_t val) { + char c{}; + std::memcpy(&c, &val, 1); + return c; + }; + std::array const header_data{{ + 31, // magic number + uint8_to_char(139), // magic number + 8, // compression type: deflate + 4, // flags: extra header + 0, // mtime + 0, // mtime + 0, // mtime + 0, // mtime: irrelevant + 4, // xfl: irrelevant + 3 // OS: irrelevant + }}; + output_stream.write(header_data.data(), header_data.size()); + std::array extra_blocklen_field{{66, 67, 2, 0}}; + auto const extra_size = pre_size_subfield.size() + extra_blocklen_field.size() + + sizeof(uint16_t) + post_size_subfield.size(); + auto const block_size = + header_data.size() + sizeof(uint16_t) + extra_size + compressed_size + 2 * sizeof(uint32_t); + write_int(output_stream, extra_size); + output_stream.write(pre_size_subfield.data(), pre_size_subfield.size()); + output_stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); + CUDF_EXPECTS(block_size - 1 <= std::numeric_limits::max(), "block size overflow"); + write_int(output_stream, block_size - 1); + output_stream.write(post_size_subfield.data(), post_size_subfield.size()); +} + +void write_uncompressed_block(std::ostream& output_stream, + host_span data, + host_span extra_garbage_before, + host_span extra_garbage_after) +{ + CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); + write_header(output_stream, data.size() + 5, extra_garbage_before, extra_garbage_after); + write_int(output_stream, 1); + write_int(output_stream, data.size()); + write_int(output_stream, ~static_cast(data.size())); + output_stream.write(data.data(), data.size()); + write_footer(output_stream, data); +} + +void write_compressed_block(std::ostream& output_stream, + host_span data, + host_span extra_garbage_before, + host_span extra_garbage_after) +{ + CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); + z_stream deflate_stream{}; + // let's make sure we have enough space to store the data + std::vector compressed_out(data.size() * 2 + 256); + deflate_stream.next_in = (unsigned char*)data.data(); + deflate_stream.avail_in = data.size(); + deflate_stream.next_out = (unsigned char*)compressed_out.data(); + deflate_stream.avail_out = compressed_out.size(); + CUDF_EXPECTS( + deflateInit2(&deflate_stream, // stream + Z_DEFAULT_COMPRESSION, // compression level + Z_DEFLATED, // method + -15, // log2 of window size (negative value means no ZLIB header/footer) + 9, // mem level: best performance/most memory usage for compression + Z_DEFAULT_STRATEGY // strategy + ) == Z_OK, + "deflateInit failed"); + CUDF_EXPECTS(deflate(&deflate_stream, Z_FINISH) == Z_STREAM_END, "deflate failed"); + CUDF_EXPECTS(deflateEnd(&deflate_stream) == Z_OK, "deflateEnd failed"); + write_header(output_stream, deflate_stream.total_out, extra_garbage_before, extra_garbage_after); + output_stream.write(compressed_out.data(), deflate_stream.total_out); + write_footer(output_stream, data); +} + +} // namespace cudf::io::text::detail::bgzip diff --git a/cpp/src/io/text/bgzip_utils.hpp b/cpp/src/io/text/bgzip_utils.hpp index fa2c617416e..a54bf73e9d6 100644 --- a/cpp/src/io/text/bgzip_utils.hpp +++ b/cpp/src/io/text/bgzip_utils.hpp @@ -28,162 +28,85 @@ namespace cudf::io::text::detail::bgzip { -template -static IntType read_int(char* data) -{ - IntType result{}; - // we assume little-endian - std::memcpy(&result, &data[0], sizeof(result)); - return result; -} - -template -void write_int(std::ostream& output_stream, T val) -{ - std::array bytes; - // we assume little-endian - std::memcpy(&bytes[0], &val, sizeof(T)); - output_stream.write(bytes.data(), bytes.size()); -} - struct header { int block_size; int extra_length; [[nodiscard]] int data_size() const { return block_size - extra_length - 20; } }; -header read_header(std::istream& input_stream) -{ - std::array buffer{}; - input_stream.read(buffer.data(), sizeof(buffer)); - std::array const expected_header{{31, 139, 8, 4}}; - CUDF_EXPECTS( - std::equal( - expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), - "malformed BGZIP header"); - // we ignore the remaining bytes of the fixed header, since they don't matter to us - auto const extra_length = read_int(&buffer[10]); - uint16_t extra_offset{}; - // read all the extra subfields - while (extra_offset < extra_length) { - auto const remaining_size = extra_length - extra_offset; - CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); - // a subfield consists of 2 identifier bytes and a uint16 length - // 66/67 identifies a BGZIP block size field, we skip all other fields - input_stream.read(buffer.data(), 4); - extra_offset += 4; - auto const subfield_size = read_int(&buffer[2]); - if (buffer[0] == 66 && buffer[1] == 67) { - // the block size subfield contains a single uint16 value, which is block_size - 1 - CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); - input_stream.read(buffer.data(), sizeof(uint16_t)); - input_stream.seekg(remaining_size - 6, std::ios_base::cur); - auto const block_size_minus_one = read_int(&buffer[0]); - return {block_size_minus_one + 1, extra_length}; - } else { - input_stream.seekg(subfield_size, std::ios_base::cur); - extra_offset += subfield_size; - } - } - CUDF_FAIL("missing BGZIP size extra subfield"); -} - struct footer { uint32_t crc; uint32_t decompressed_size; }; -footer read_footer(std::istream& input_stream) -{ - std::array buffer{}; - input_stream.read(buffer.data(), sizeof(buffer)); - return {read_int(&buffer[0]), read_int(&buffer[4])}; -} +/** + * @brief Reads the full BGZIP header from the given input stream. Afterwards, the stream position + * is at the first data byte. + * + * @param input_stream The input stream + * @return The header storing the compressed size and extra subfield length + */ +header read_header(std::istream& input_stream); -void write_footer(std::ostream& output_stream, host_span data) -{ - // compute crc32 with zlib, this allows checking the generated files with external tools - write_int(output_stream, crc32(0, (unsigned char*)data.data(), data.size())); - write_int(output_stream, data.size()); -} +/** + * @brief Reads the full BGZIP footer from the given input stream. Afterwards, the stream position + * is after the last footer byte. + * + * @param input_stream The input stream + * @return The footer storing uncompressed size and CRC32 + */ +footer read_footer(std::istream& input_stream); +/** + * @brief Writes a header for data of the given compressed size to the given stream. + * + * @param output_stream The output stream + * @param compressed_size The size of the compressed data + * @param pre_size_subfields Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param post_size_subfields Any subfields to be placed after the BGZIP block size subfield + */ void write_header(std::ostream& output_stream, uint16_t compressed_size, - host_span pre_size_subfield, - host_span post_size_subfield) -{ - auto uint8_to_char = [](uint8_t val) { - char c{}; - std::memcpy(&c, &val, 1); - return c; - }; - std::array const header_data{{ - 31, // magic number - uint8_to_char(139), // magic number - 8, // compression type: deflate - 4, // flags: extra header - 0, // mtime - 0, // mtime - 0, // mtime - 0, // mtime: irrelevant - 4, // xfl: irrelevant - 3 // OS: irrelevant - }}; - output_stream.write(header_data.data(), header_data.size()); - std::array extra_blocklen_field{{66, 67, 2, 0}}; - auto const extra_size = pre_size_subfield.size() + extra_blocklen_field.size() + - sizeof(uint16_t) + post_size_subfield.size(); - auto const block_size = - header_data.size() + sizeof(uint16_t) + extra_size + compressed_size + 2 * sizeof(uint32_t); - write_int(output_stream, extra_size); - output_stream.write(pre_size_subfield.data(), pre_size_subfield.size()); - output_stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); - CUDF_EXPECTS(block_size - 1 <= std::numeric_limits::max(), "block size overflow"); - write_int(output_stream, block_size - 1); - output_stream.write(post_size_subfield.data(), post_size_subfield.size()); -} + host_span pre_size_subfields, + host_span post_size_subfields); + +/** + * @brief Writes a footer for the given uncompressed data to the given stream. + * + * @param output_stream The output stream + * @param data The data for which uncompressed size and CRC32 will be computed and written + */ +void write_footer(std::ostream& output_stream, host_span data); +/** + * @brief Writes the given data to the given stream as an uncompressed deflate block with BZGIP + * header and footer. + * + * @param output_stream The output stream + * @param data The uncompressed data + * @param extra_garbage_before Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param extra_garbage_after Any subfields to be placed after the BGZIP block size subfield + */ void write_uncompressed_block(std::ostream& output_stream, host_span data, host_span extra_garbage_before = {}, - host_span extra_garbage_after = {}) -{ - CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); - write_header(output_stream, data.size() + 5, extra_garbage_before, extra_garbage_after); - write_int(output_stream, 1); - write_int(output_stream, data.size()); - write_int(output_stream, ~static_cast(data.size())); - output_stream.write(data.data(), data.size()); - write_footer(output_stream, data); -} + host_span extra_garbage_after = {}); +/** + * @brief Writes the given data to the given stream as a compressed deflate block with BZGIP + * header and footer. + * + * @param output_stream The output stream + * @param data The uncompressed data + * @param extra_garbage_before Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param extra_garbage_after Any subfields to be placed after the BGZIP block size subfield + */ void write_compressed_block(std::ostream& output_stream, host_span data, host_span extra_garbage_before = {}, - host_span extra_garbage_after = {}) -{ - CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); - z_stream deflate_stream{}; - // let's make sure we have enough space to store the data - std::vector compressed_out(data.size() * 2 + 256); - deflate_stream.next_in = (unsigned char*)data.data(); - deflate_stream.avail_in = data.size(); - deflate_stream.next_out = (unsigned char*)compressed_out.data(); - deflate_stream.avail_out = compressed_out.size(); - CUDF_EXPECTS( - deflateInit2(&deflate_stream, // stream - Z_DEFAULT_COMPRESSION, // compression level - Z_DEFLATED, // method - -15, // log2 of window size (negative value means no ZLIB header/footer) - 9, // mem level: best performance/most memory usage for compression - Z_DEFAULT_STRATEGY // strategy - ) == Z_OK, - "deflateInit failed"); - CUDF_EXPECTS(deflate(&deflate_stream, Z_FINISH) == Z_STREAM_END, "deflate failed"); - CUDF_EXPECTS(deflateEnd(&deflate_stream) == Z_OK, "deflateEnd failed"); - write_header(output_stream, deflate_stream.total_out, extra_garbage_before, extra_garbage_after); - output_stream.write(compressed_out.data(), deflate_stream.total_out); - write_footer(output_stream, data); -} + host_span extra_garbage_after = {}); } // namespace cudf::io::text::detail::bgzip From 5b8d35683e309864559075f8b0cfa6f07dbc2b10 Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Tue, 4 Oct 2022 09:17:52 +0000 Subject: [PATCH 4/7] improve test readability --- cpp/tests/io/text/data_chunk_source_test.cpp | 26 ++++++++++++-------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index 9593cdaa31b..c325dbc8a26 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -127,11 +127,15 @@ TEST_F(DataChunkSourceTest, Host) test_source(content, *source); } +enum class compression { ENABLED, DISABLED }; + +enum class eof { ADD_EOF_BLOCK, NO_EOF_BLOCK }; + void write_bgzip(std::ostream& output_stream, - const std::string& data, + cudf::host_span data, std::default_random_engine& rng, - bool compress, - bool write_eof) + compression compress, + eof add_eof) { std::vector const extra_garbage_field1{{13, // magic number 37, // magic number @@ -161,8 +165,8 @@ void write_bgzip(std::ostream& output_stream, 1, 2, 3, 4, 5, 6, 7, 8}}; // make sure the block size with header stays below 65536 std::uniform_int_distribution block_size_dist{1, 65000}; - auto begin = data.data(); - auto const end = data.data() + data.size(); + auto begin = data.begin(); + auto const end = data.end(); int i = 0; while (begin < end) { using cudf::host_span; @@ -171,7 +175,7 @@ void write_bgzip(std::ostream& output_stream, i & 1 ? extra_garbage_field1 : host_span{}; host_span const garbage_after = i & 2 ? extra_garbage_field2 : host_span{}; - if (compress) { + if (compress == compression::ENABLED) { cudf::io::text::detail::bgzip::write_compressed_block( output_stream, {begin, len}, garbage_before, garbage_after); } else { @@ -181,7 +185,9 @@ void write_bgzip(std::ostream& output_stream, begin += len; i++; } - if (write_eof) { cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } + if (add_eof == eof::ADD_EOF_BLOCK) { + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); + } } TEST_F(DataChunkSourceTest, BgzipSource) @@ -194,7 +200,7 @@ TEST_F(DataChunkSourceTest, BgzipSource) { std::ofstream output_stream{filename}; std::default_random_engine rng{}; - write_bgzip(output_stream, input, rng, false, true); + write_bgzip(output_stream, input, rng, compression::DISABLED, eof::ADD_EOF_BLOCK); } auto const source = cudf::io::text::make_source_from_bgzip_file(filename); @@ -227,7 +233,7 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) begin_compressed_offset = output_stream.tellp(); cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, data_garbage + begininput); - write_bgzip(output_stream, input, rng, false, false); + write_bgzip(output_stream, input, rng, compression::DISABLED, eof::NO_EOF_BLOCK); end_compressed_offset = output_stream.tellp(); cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, endinput + data_garbage + data_garbage); @@ -321,7 +327,7 @@ TEST_F(DataChunkSourceTest, BgzipCompressedSourceVirtualOffsets) std::default_random_engine rng{}; begin_compressed_offset = output_stream.tellp(); cudf::io::text::detail::bgzip::write_compressed_block(output_stream, data_garbage + begininput); - write_bgzip(output_stream, input, rng, true, false); + write_bgzip(output_stream, input, rng, compression::ENABLED, eof::NO_EOF_BLOCK); end_compressed_offset = output_stream.tellp(); cudf::io::text::detail::bgzip::write_compressed_block(output_stream, endinput + data_garbage + data_garbage); From d176cee82340c53b6bf31ee3e72625c5dd0fc74f Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Tue, 4 Oct 2022 09:18:13 +0000 Subject: [PATCH 5/7] use enum_type for source_type --- cpp/benchmarks/io/text/multibyte_split.cpp | 39 +++++++++++++++------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index 3a2c1abfdb6..b0a198ca9e7 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -48,6 +48,20 @@ temp_directory const temp_dir("cudf_nvbench"); enum class data_chunk_source_type { device, file, host, host_pinned, file_bgzip }; +NVBENCH_DECLARE_ENUM_TYPE_STRINGS( + data_chunk_source_type, + [](auto value) { + switch (value) { + case data_chunk_source_type::device: return "device"; + case data_chunk_source_type::file: return "file"; + case data_chunk_source_type::host: return "host"; + case data_chunk_source_type::host_pinned: return "host_pinned"; + case data_chunk_source_type::file_bgzip: return "file_bgzip"; + default: return "Unknown"; + } + }, + [](auto) { return std::string{}; }) + static cudf::string_scalar create_random_input(int32_t num_chars, double delim_factor, double deviation, @@ -98,14 +112,15 @@ static void write_bgzip_file(cudf::host_span host_data, std::ostream cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); } -static void bench_multibyte_split(nvbench::state& state) +template +static void bench_multibyte_split(nvbench::state& state, + nvbench::type_list>) { cudf::rmm_pool_raii pool_raii; - auto const source_type = static_cast(state.get_int64("source_type")); - auto const delim_size = state.get_int64("delim_size"); - auto const delim_percent = state.get_int64("delim_percent"); - auto const file_size_approx = state.get_int64("size_approx"); + auto const delim_size = state.get_int64("delim_size"); + auto const delim_percent = state.get_int64("delim_percent"); + auto const file_size_approx = state.get_int64("size_approx"); auto const byte_range_percent = state.get_int64("byte_range_percent"); auto const byte_range_factor = static_cast(byte_range_percent) / 100; @@ -181,14 +196,14 @@ static void bench_multibyte_split(nvbench::state& state) state.add_buffer_size(range_size, "efs", "Encoded file size"); } -NVBENCH_BENCH(bench_multibyte_split) +using source_type_list = nvbench::enum_type_list; + +NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list)) .set_name("multibyte_split") - .add_int64_axis("source_type", - {static_cast(data_chunk_source_type::device), - static_cast(data_chunk_source_type::file), - static_cast(data_chunk_source_type::host), - static_cast(data_chunk_source_type::host_pinned), - static_cast(data_chunk_source_type::file_bgzip)}) .add_int64_axis("delim_size", {1, 4, 7}) .add_int64_axis("delim_percent", {1, 25}) .add_int64_power_of_two_axis("size_approx", {15, 30}) From 8632e2711ae7ddfe15a0f1deccf840ee731a952f Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Wed, 5 Oct 2022 15:46:46 +0000 Subject: [PATCH 6/7] review updates * move header to include * more constexpr * rename ostreams to output_stream * fewer magic numbers --- conda/recipes/libcudf/meta.yaml | 1 + cpp/benchmarks/io/text/multibyte_split.cpp | 13 +++-- .../cudf/io/text/detail}/bgzip_utils.hpp | 0 cpp/src/io/text/bgzip_data_chunk_source.cu | 2 +- cpp/src/io/text/bgzip_utils.cpp | 52 +++++++++---------- cpp/tests/io/text/data_chunk_source_test.cpp | 3 +- 6 files changed, 34 insertions(+), 37 deletions(-) rename cpp/{src/io/text => include/cudf/io/text/detail}/bgzip_utils.hpp (100%) diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index a417b407044..ccb0d685062 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -152,6 +152,7 @@ outputs: - test -f $PREFIX/include/cudf/io/text/byte_range_info.hpp - test -f $PREFIX/include/cudf/io/text/data_chunk_source.hpp - test -f $PREFIX/include/cudf/io/text/data_chunk_source_factories.hpp + - test -f $PREFIX/include/cudf/io/text/detail/bgzip_utils.hpp - test -f $PREFIX/include/cudf/io/text/detail/multistate.hpp - test -f $PREFIX/include/cudf/io/text/detail/tile_state.hpp - test -f $PREFIX/include/cudf/io/text/detail/trie.hpp diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index b0a198ca9e7..b7e85d8aa7e 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -14,8 +14,6 @@ * limitations under the License. */ -#include "io/text/bgzip_utils.hpp" - #include #include #include @@ -27,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -95,7 +94,7 @@ static cudf::string_scalar create_random_input(int32_t num_chars, return cudf::string_scalar(std::move(*chars_buffer)); } -static void write_bgzip_file(cudf::host_span host_data, std::ostream& stream) +static void write_bgzip_file(cudf::host_span host_data, std::ostream& output_stream) { // a bit of variability with a decent amount of padding so we don't overflow 16 bit block sizes std::uniform_int_distribution chunk_size_dist{64000, 65000}; @@ -104,12 +103,12 @@ static void write_bgzip_file(cudf::host_span host_data, std::ostream while (pos < host_data.size()) { auto const remainder = host_data.size() - pos; auto const chunk_size = std::min(remainder, chunk_size_dist(rng)); - cudf::io::text::detail::bgzip::write_compressed_block(stream, + cudf::io::text::detail::bgzip::write_compressed_block(output_stream, {host_data.data() + pos, chunk_size}); pos += chunk_size; } // empty block denotes EOF - cudf::io::text::detail::bgzip::write_uncompressed_block(stream, {}); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } template @@ -170,8 +169,8 @@ static void bench_multibyte_split(nvbench::state& state, case data_chunk_source_type::file_bgzip: { auto const temp_file_name = random_file_in_dir(temp_dir.path()); { - std::ofstream stream(temp_file_name, std::ofstream::out); - write_bgzip_file(host_input, stream); + std::ofstream output_stream(temp_file_name, std::ofstream::out); + write_bgzip_file(host_input, output_stream); } return cudf::io::text::make_source_from_bgzip_file(temp_file_name); } diff --git a/cpp/src/io/text/bgzip_utils.hpp b/cpp/include/cudf/io/text/detail/bgzip_utils.hpp similarity index 100% rename from cpp/src/io/text/bgzip_utils.hpp rename to cpp/include/cudf/io/text/detail/bgzip_utils.hpp diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index ab1f4287b8a..9c4ff218783 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -15,12 +15,12 @@ */ #include "io/comp/nvcomp_adapter.hpp" -#include "io/text/bgzip_utils.hpp" #include "io/text/device_data_chunks.hpp" #include "io/utilities/config_utils.hpp" #include #include +#include #include #include diff --git a/cpp/src/io/text/bgzip_utils.cpp b/cpp/src/io/text/bgzip_utils.cpp index bef60119369..6047f74b633 100644 --- a/cpp/src/io/text/bgzip_utils.cpp +++ b/cpp/src/io/text/bgzip_utils.cpp @@ -14,8 +14,7 @@ * limitations under the License. */ -#include "io/text/bgzip_utils.hpp" - +#include #include #include @@ -49,11 +48,13 @@ void write_int(std::ostream& output_stream, T val) } // namespace +std::array constexpr extra_blocklen_field_header{{66, 67, 2, 0}}; + header read_header(std::istream& input_stream) { std::array buffer{}; input_stream.read(buffer.data(), sizeof(buffer)); - std::array const expected_header{{31, 139, 8, 4}}; + std::array constexpr expected_header{{31, 139, 8, 4}}; CUDF_EXPECTS( std::equal( expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), @@ -70,9 +71,12 @@ header read_header(std::istream& input_stream) input_stream.read(buffer.data(), 4); extra_offset += 4; auto const subfield_size = read_int(&buffer[2]); - if (buffer[0] == 66 && buffer[1] == 67) { + if (buffer[0] == extra_blocklen_field_header[0] && + buffer[1] == extra_blocklen_field_header[1]) { // the block size subfield contains a single uint16 value, which is block_size - 1 - CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); + CUDF_EXPECTS( + buffer[2] == extra_blocklen_field_header[2] && buffer[3] == extra_blocklen_field_header[3], + "malformed BGZIP extra subfield"); input_stream.read(buffer.data(), sizeof(uint16_t)); input_stream.seekg(remaining_size - 6, std::ios_base::cur); auto const block_size_minus_one = read_int(&buffer[0]); @@ -104,32 +108,26 @@ void write_header(std::ostream& output_stream, host_span pre_size_subfield, host_span post_size_subfield) { - auto uint8_to_char = [](uint8_t val) { - char c{}; - std::memcpy(&c, &val, 1); - return c; - }; - std::array const header_data{{ - 31, // magic number - uint8_to_char(139), // magic number - 8, // compression type: deflate - 4, // flags: extra header - 0, // mtime - 0, // mtime - 0, // mtime - 0, // mtime: irrelevant - 4, // xfl: irrelevant - 3 // OS: irrelevant + std::array constexpr header_data{{ + 31, // magic number + 139, // magic number + 8, // compression type: deflate + 4, // flags: extra header + 0, // mtime + 0, // mtime + 0, // mtime + 0, // mtime: irrelevant + 4, // xfl: irrelevant + 3 // OS: irrelevant }}; - output_stream.write(header_data.data(), header_data.size()); - std::array extra_blocklen_field{{66, 67, 2, 0}}; - auto const extra_size = pre_size_subfield.size() + extra_blocklen_field.size() + + output_stream.write(reinterpret_cast(header_data.data()), header_data.size()); + auto const extra_size = pre_size_subfield.size() + extra_blocklen_field_header.size() + sizeof(uint16_t) + post_size_subfield.size(); auto const block_size = header_data.size() + sizeof(uint16_t) + extra_size + compressed_size + 2 * sizeof(uint32_t); write_int(output_stream, extra_size); output_stream.write(pre_size_subfield.data(), pre_size_subfield.size()); - output_stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); + output_stream.write(extra_blocklen_field_header.data(), extra_blocklen_field_header.size()); CUDF_EXPECTS(block_size - 1 <= std::numeric_limits::max(), "block size overflow"); write_int(output_stream, block_size - 1); output_stream.write(post_size_subfield.data(), post_size_subfield.size()); @@ -158,9 +156,9 @@ void write_compressed_block(std::ostream& output_stream, z_stream deflate_stream{}; // let's make sure we have enough space to store the data std::vector compressed_out(data.size() * 2 + 256); - deflate_stream.next_in = (unsigned char*)data.data(); + deflate_stream.next_in = reinterpret_cast(const_cast(data.data())); deflate_stream.avail_in = data.size(); - deflate_stream.next_out = (unsigned char*)compressed_out.data(); + deflate_stream.next_out = reinterpret_cast(compressed_out.data()); deflate_stream.avail_out = compressed_out.size(); CUDF_EXPECTS( deflateInit2(&deflate_stream, // stream diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index c325dbc8a26..28a58d37a91 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -17,9 +17,8 @@ #include #include -#include "io/text/bgzip_utils.hpp" - #include +#include #include From f6b1d26ab334d657121e7d684233d357e4d9c44e Mon Sep 17 00:00:00 2001 From: Tobias Ribizel Date: Sun, 9 Oct 2022 08:29:58 +0000 Subject: [PATCH 7/7] improve header placement, rename parameters --- .../cudf/io/text/detail/bgzip_utils.hpp | 24 ++++---- cpp/src/io/text/bgzip_utils.cpp | 16 +++--- cpp/tests/io/text/data_chunk_source_test.cpp | 56 +++++++++---------- 3 files changed, 48 insertions(+), 48 deletions(-) diff --git a/cpp/include/cudf/io/text/detail/bgzip_utils.hpp b/cpp/include/cudf/io/text/detail/bgzip_utils.hpp index a54bf73e9d6..627df5f358a 100644 --- a/cpp/include/cudf/io/text/detail/bgzip_utils.hpp +++ b/cpp/include/cudf/io/text/detail/bgzip_utils.hpp @@ -16,6 +16,8 @@ #pragma once +#include + #include #include @@ -24,8 +26,6 @@ #include #include -#include - namespace cudf::io::text::detail::bgzip { struct header { @@ -85,14 +85,14 @@ void write_footer(std::ostream& output_stream, host_span data); * * @param output_stream The output stream * @param data The uncompressed data - * @param extra_garbage_before Any GZIP extra subfields (need to be valid) to be placed before the - * BGZIP block size subfield - * @param extra_garbage_after Any subfields to be placed after the BGZIP block size subfield + * @param pre_size_subfields Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param post_size_subfields Any subfields to be placed after the BGZIP block size subfield */ void write_uncompressed_block(std::ostream& output_stream, host_span data, - host_span extra_garbage_before = {}, - host_span extra_garbage_after = {}); + host_span pre_size_subfields = {}, + host_span post_size_subfields = {}); /** * @brief Writes the given data to the given stream as a compressed deflate block with BZGIP @@ -100,13 +100,13 @@ void write_uncompressed_block(std::ostream& output_stream, * * @param output_stream The output stream * @param data The uncompressed data - * @param extra_garbage_before Any GZIP extra subfields (need to be valid) to be placed before the - * BGZIP block size subfield - * @param extra_garbage_after Any subfields to be placed after the BGZIP block size subfield + * @param pre_size_subfields Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param post_size_subfields Any subfields to be placed after the BGZIP block size subfield */ void write_compressed_block(std::ostream& output_stream, host_span data, - host_span extra_garbage_before = {}, - host_span extra_garbage_after = {}); + host_span pre_size_subfields = {}, + host_span post_size_subfields = {}); } // namespace cudf::io::text::detail::bgzip diff --git a/cpp/src/io/text/bgzip_utils.cpp b/cpp/src/io/text/bgzip_utils.cpp index 6047f74b633..dd08387a6b5 100644 --- a/cpp/src/io/text/bgzip_utils.cpp +++ b/cpp/src/io/text/bgzip_utils.cpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include + #include #include #include @@ -23,8 +25,6 @@ #include #include -#include - namespace cudf::io::text::detail::bgzip { namespace { @@ -135,11 +135,11 @@ void write_header(std::ostream& output_stream, void write_uncompressed_block(std::ostream& output_stream, host_span data, - host_span extra_garbage_before, - host_span extra_garbage_after) + host_span pre_size_subfields, + host_span post_size_subfields) { CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); - write_header(output_stream, data.size() + 5, extra_garbage_before, extra_garbage_after); + write_header(output_stream, data.size() + 5, pre_size_subfields, post_size_subfields); write_int(output_stream, 1); write_int(output_stream, data.size()); write_int(output_stream, ~static_cast(data.size())); @@ -149,8 +149,8 @@ void write_uncompressed_block(std::ostream& output_stream, void write_compressed_block(std::ostream& output_stream, host_span data, - host_span extra_garbage_before, - host_span extra_garbage_after) + host_span pre_size_subfields, + host_span post_size_subfields) { CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); z_stream deflate_stream{}; @@ -171,7 +171,7 @@ void write_compressed_block(std::ostream& output_stream, "deflateInit failed"); CUDF_EXPECTS(deflate(&deflate_stream, Z_FINISH) == Z_STREAM_END, "deflate failed"); CUDF_EXPECTS(deflateEnd(&deflate_stream) == Z_OK, "deflateEnd failed"); - write_header(output_stream, deflate_stream.total_out, extra_garbage_before, extra_garbage_after); + write_header(output_stream, deflate_stream.total_out, pre_size_subfields, post_size_subfields); output_stream.write(compressed_out.data(), deflate_stream.total_out); write_footer(output_stream, data); } diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index 28a58d37a91..7cb75aea8e2 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -136,32 +136,32 @@ void write_bgzip(std::ostream& output_stream, compression compress, eof add_eof) { - std::vector const extra_garbage_field1{{13, // magic number - 37, // magic number - 7, // field length - 0, // field length - 1, - 2, - 3, - 4, - 5, - 6, - 7}}; - std::vector const extra_garbage_field2{{12, // magic number - 34, // magic number - 2, // field length - 0, // field length - 1, 2, - 56, // magic number - 78, // magic number - 1, // field length - 0, // field length - 3, // - 90, // magic number - 12, // magic number - 8, // field length - 0, // field length - 1, 2, 3, 4, 5, 6, 7, 8}}; + std::vector const extra_garbage_fields1{{13, // magic number + 37, // magic number + 7, // field length + 0, // field length + 1, + 2, + 3, + 4, + 5, + 6, + 7}}; + std::vector const extra_garbage_fields2{{12, // magic number + 34, // magic number + 2, // field length + 0, // field length + 1, 2, + 56, // magic number + 78, // magic number + 1, // field length + 0, // field length + 3, // + 90, // magic number + 12, // magic number + 8, // field length + 0, // field length + 1, 2, 3, 4, 5, 6, 7, 8}}; // make sure the block size with header stays below 65536 std::uniform_int_distribution block_size_dist{1, 65000}; auto begin = data.begin(); @@ -171,9 +171,9 @@ void write_bgzip(std::ostream& output_stream, using cudf::host_span; auto len = std::min(end - begin, block_size_dist(rng)); host_span const garbage_before = - i & 1 ? extra_garbage_field1 : host_span{}; + i & 1 ? extra_garbage_fields1 : host_span{}; host_span const garbage_after = - i & 2 ? extra_garbage_field2 : host_span{}; + i & 2 ? extra_garbage_fields2 : host_span{}; if (compress == compression::ENABLED) { cudf::io::text::detail::bgzip::write_compressed_block( output_stream, {begin, len}, garbage_before, garbage_after);