diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index a417b407044..ccb0d685062 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -152,6 +152,7 @@ outputs: - test -f $PREFIX/include/cudf/io/text/byte_range_info.hpp - test -f $PREFIX/include/cudf/io/text/data_chunk_source.hpp - test -f $PREFIX/include/cudf/io/text/data_chunk_source_factories.hpp + - test -f $PREFIX/include/cudf/io/text/detail/bgzip_utils.hpp - test -f $PREFIX/include/cudf/io/text/detail/multistate.hpp - test -f $PREFIX/include/cudf/io/text/detail/tile_state.hpp - test -f $PREFIX/include/cudf/io/text/detail/trie.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 60e914f07d3..8bde0bcfb9b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -356,6 +356,7 @@ add_library( src/io/text/byte_range_info.cpp src/io/text/data_chunk_source_factories.cpp src/io/text/bgzip_data_chunk_source.cu + src/io/text/bgzip_utils.cpp src/io/text/multibyte_split.cu src/io/utilities/column_buffer.cpp src/io/utilities/config_utils.cpp diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index d1ff177a25e..f35d0b0b49e 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -301,7 +301,8 @@ ConfigureNVBench(NESTED_JSON_NVBENCH io/json/nested_json.cpp) # ################################################################################################## # * io benchmark --------------------------------------------------------------------- -ConfigureNVBench(MULTIBYTE_SPLIT_BENCHMARK io/text/multibyte_split.cpp) +ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp) +target_link_libraries(MULTIBYTE_SPLIT_NVBENCH PRIVATE ZLIB::ZLIB) add_custom_target( run_benchmarks diff --git a/cpp/benchmarks/io/text/multibyte_split.cpp b/cpp/benchmarks/io/text/multibyte_split.cpp index 4865d11ae8b..b7e85d8aa7e 100644 --- a/cpp/benchmarks/io/text/multibyte_split.cpp +++ b/cpp/benchmarks/io/text/multibyte_split.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -40,10 +41,25 @@ #include #include #include +#include temp_directory const temp_dir("cudf_nvbench"); -enum class data_chunk_source_type { device, file, host, host_pinned }; +enum class data_chunk_source_type { device, file, host, host_pinned, file_bgzip }; + +NVBENCH_DECLARE_ENUM_TYPE_STRINGS( + data_chunk_source_type, + [](auto value) { + switch (value) { + case data_chunk_source_type::device: return "device"; + case data_chunk_source_type::file: return "file"; + case data_chunk_source_type::host: return "host"; + case data_chunk_source_type::host_pinned: return "host_pinned"; + case data_chunk_source_type::file_bgzip: return "file_bgzip"; + default: return "Unknown"; + } + }, + [](auto) { return std::string{}; }) static cudf::string_scalar create_random_input(int32_t num_chars, double delim_factor, @@ -78,14 +94,32 @@ static cudf::string_scalar create_random_input(int32_t num_chars, return cudf::string_scalar(std::move(*chars_buffer)); } -static void bench_multibyte_split(nvbench::state& state) +static void write_bgzip_file(cudf::host_span host_data, std::ostream& output_stream) +{ + // a bit of variability with a decent amount of padding so we don't overflow 16 bit block sizes + std::uniform_int_distribution chunk_size_dist{64000, 65000}; + std::default_random_engine rng{}; + std::size_t pos = 0; + while (pos < host_data.size()) { + auto const remainder = host_data.size() - pos; + auto const chunk_size = std::min(remainder, chunk_size_dist(rng)); + cudf::io::text::detail::bgzip::write_compressed_block(output_stream, + {host_data.data() + pos, chunk_size}); + pos += chunk_size; + } + // empty block denotes EOF + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); +} + +template +static void bench_multibyte_split(nvbench::state& state, + nvbench::type_list>) { cudf::rmm_pool_raii pool_raii; - auto const source_type = static_cast(state.get_int64("source_type")); - auto const delim_size = state.get_int64("delim_size"); - auto const delim_percent = state.get_int64("delim_percent"); - auto const file_size_approx = state.get_int64("size_approx"); + auto const delim_size = state.get_int64("delim_size"); + auto const delim_percent = state.get_int64("delim_percent"); + auto const file_size_approx = state.get_int64("size_approx"); auto const byte_range_percent = state.get_int64("byte_range_percent"); auto const byte_range_factor = static_cast(byte_range_percent) / 100; @@ -104,7 +138,8 @@ static void bench_multibyte_split(nvbench::state& state) auto host_pinned_input = thrust::host_vector>{}; - if (source_type == data_chunk_source_type::host || source_type == data_chunk_source_type::file) { + if (source_type == data_chunk_source_type::host || source_type == data_chunk_source_type::file || + source_type == data_chunk_source_type::file_bgzip) { host_input = cudf::detail::make_std_vector_sync( {device_input.data(), static_cast(device_input.size())}, cudf::default_stream_value); @@ -131,6 +166,14 @@ static void bench_multibyte_split(nvbench::state& state) return cudf::io::text::make_source(host_pinned_input); case data_chunk_source_type::device: // return cudf::io::text::make_source(device_input); + case data_chunk_source_type::file_bgzip: { + auto const temp_file_name = random_file_in_dir(temp_dir.path()); + { + std::ofstream output_stream(temp_file_name, std::ofstream::out); + write_bgzip_file(host_input, output_stream); + } + return cudf::io::text::make_source_from_bgzip_file(temp_file_name); + } default: CUDF_FAIL(); } }(); @@ -152,13 +195,14 @@ static void bench_multibyte_split(nvbench::state& state) state.add_buffer_size(range_size, "efs", "Encoded file size"); } -NVBENCH_BENCH(bench_multibyte_split) +using source_type_list = nvbench::enum_type_list; + +NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list)) .set_name("multibyte_split") - .add_int64_axis("source_type", - {static_cast(data_chunk_source_type::device), - static_cast(data_chunk_source_type::file), - static_cast(data_chunk_source_type::host), - static_cast(data_chunk_source_type::host_pinned)}) .add_int64_axis("delim_size", {1, 4, 7}) .add_int64_axis("delim_percent", {1, 25}) .add_int64_power_of_two_axis("size_approx", {15, 30}) diff --git a/cpp/include/cudf/io/text/detail/bgzip_utils.hpp b/cpp/include/cudf/io/text/detail/bgzip_utils.hpp new file mode 100644 index 00000000000..627df5f358a --- /dev/null +++ b/cpp/include/cudf/io/text/detail/bgzip_utils.hpp @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +#include +#include +#include +#include + +namespace cudf::io::text::detail::bgzip { + +struct header { + int block_size; + int extra_length; + [[nodiscard]] int data_size() const { return block_size - extra_length - 20; } +}; + +struct footer { + uint32_t crc; + uint32_t decompressed_size; +}; + +/** + * @brief Reads the full BGZIP header from the given input stream. Afterwards, the stream position + * is at the first data byte. + * + * @param input_stream The input stream + * @return The header storing the compressed size and extra subfield length + */ +header read_header(std::istream& input_stream); + +/** + * @brief Reads the full BGZIP footer from the given input stream. Afterwards, the stream position + * is after the last footer byte. + * + * @param input_stream The input stream + * @return The footer storing uncompressed size and CRC32 + */ +footer read_footer(std::istream& input_stream); + +/** + * @brief Writes a header for data of the given compressed size to the given stream. + * + * @param output_stream The output stream + * @param compressed_size The size of the compressed data + * @param pre_size_subfields Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param post_size_subfields Any subfields to be placed after the BGZIP block size subfield + */ +void write_header(std::ostream& output_stream, + uint16_t compressed_size, + host_span pre_size_subfields, + host_span post_size_subfields); + +/** + * @brief Writes a footer for the given uncompressed data to the given stream. + * + * @param output_stream The output stream + * @param data The data for which uncompressed size and CRC32 will be computed and written + */ +void write_footer(std::ostream& output_stream, host_span data); + +/** + * @brief Writes the given data to the given stream as an uncompressed deflate block with BZGIP + * header and footer. + * + * @param output_stream The output stream + * @param data The uncompressed data + * @param pre_size_subfields Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param post_size_subfields Any subfields to be placed after the BGZIP block size subfield + */ +void write_uncompressed_block(std::ostream& output_stream, + host_span data, + host_span pre_size_subfields = {}, + host_span post_size_subfields = {}); + +/** + * @brief Writes the given data to the given stream as a compressed deflate block with BZGIP + * header and footer. + * + * @param output_stream The output stream + * @param data The uncompressed data + * @param pre_size_subfields Any GZIP extra subfields (need to be valid) to be placed before the + * BGZIP block size subfield + * @param post_size_subfields Any subfields to be placed after the BGZIP block size subfield + */ +void write_compressed_block(std::ostream& output_stream, + host_span data, + host_span pre_size_subfields = {}, + host_span post_size_subfields = {}); + +} // namespace cudf::io::text::detail::bgzip diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index 7715c2ca7e1..9c4ff218783 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -36,7 +37,6 @@ #include namespace cudf::io::text { - namespace { /** @@ -64,68 +64,6 @@ struct bgzip_nvcomp_transform_functor { class bgzip_data_chunk_reader : public data_chunk_reader { private: - template - static IntType read_int(char* data) - { - IntType result{}; - // we assume little-endian - std::memcpy(&result, &data[0], sizeof(result)); - return result; - } - - struct bgzip_header { - int block_size; - int extra_length; - [[nodiscard]] int data_size() const { return block_size - extra_length - 20; } - }; - - bgzip_header read_header() - { - std::array buffer{}; - _data_stream->read(buffer.data(), sizeof(buffer)); - std::array const expected_header{{31, 139, 8, 4}}; - CUDF_EXPECTS( - std::equal( - expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), - "malformed BGZIP header"); - // we ignore the remaining bytes of the fixed header, since they don't matter to us - auto const extra_length = read_int(&buffer[10]); - uint16_t extra_offset{}; - // read all the extra subfields - while (extra_offset < extra_length) { - auto const remaining_size = extra_length - extra_offset; - CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); - // a subfield consists of 2 identifier bytes and a uint16 length - // 66/67 identifies a BGZIP block size field, we skip all other fields - _data_stream->read(buffer.data(), 4); - extra_offset += 4; - auto const subfield_size = read_int(&buffer[2]); - if (buffer[0] == 66 && buffer[1] == 67) { - // the block size subfield contains a single uint16 value, which is block_size - 1 - CUDF_EXPECTS(subfield_size == sizeof(uint16_t), "malformed BGZIP extra subfield"); - _data_stream->read(buffer.data(), sizeof(uint16_t)); - _data_stream->seekg(remaining_size - 6, std::ios_base::cur); - auto const block_size_minus_one = read_int(&buffer[0]); - return {block_size_minus_one + 1, extra_length}; - } else { - _data_stream->seekg(subfield_size, std::ios_base::cur); - extra_offset += subfield_size; - } - } - CUDF_FAIL("missing BGZIP size extra subfield"); - } - - struct bgzip_footer { - uint32_t decompressed_size; - }; - - bgzip_footer read_footer() - { - std::array buffer{}; - _data_stream->read(buffer.data(), sizeof(buffer)); - return {read_int(&buffer[4])}; - } - template using pinned_host_vector = thrust::host_vector>; @@ -258,13 +196,13 @@ class bgzip_data_chunk_reader : public data_chunk_reader { return available_decompressed_size - read_pos; } - void read_block(bgzip_header header, std::istream& stream) + void read_block(detail::bgzip::header header, std::istream& stream) { h_compressed_blocks.resize(h_compressed_blocks.size() + header.data_size()); stream.read(h_compressed_blocks.data() + compressed_size(), header.data_size()); } - void add_block_offsets(bgzip_header header, bgzip_footer footer) + void add_block_offsets(detail::bgzip::header header, detail::bgzip::footer footer) { max_decompressed_size = std::max(footer.decompressed_size, max_decompressed_size); @@ -294,9 +232,9 @@ class bgzip_data_chunk_reader : public data_chunk_reader { // peek is necessary if we are already at the end, but didn't try to read another byte _data_stream->peek(); if (_data_stream->eof() || _compressed_pos > _compressed_end) { break; } - auto header = read_header(); + auto header = detail::bgzip::read_header(*_data_stream); _curr_blocks.read_block(header, *_data_stream); - auto footer = read_footer(); + auto footer = detail::bgzip::read_footer(*_data_stream); _curr_blocks.add_block_offsets(header, footer); // for the last GZIP block, we restrict ourselves to the bytes up to _local_end // but only for the reader, not for decompression! diff --git a/cpp/src/io/text/bgzip_utils.cpp b/cpp/src/io/text/bgzip_utils.cpp new file mode 100644 index 00000000000..dd08387a6b5 --- /dev/null +++ b/cpp/src/io/text/bgzip_utils.cpp @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace cudf::io::text::detail::bgzip { +namespace { + +template +IntType read_int(char* data) +{ + IntType result{}; + // we assume little-endian + std::memcpy(&result, &data[0], sizeof(result)); + return result; +} + +template +void write_int(std::ostream& output_stream, T val) +{ + std::array bytes; + // we assume little-endian + std::memcpy(&bytes[0], &val, sizeof(T)); + output_stream.write(bytes.data(), bytes.size()); +} + +} // namespace + +std::array constexpr extra_blocklen_field_header{{66, 67, 2, 0}}; + +header read_header(std::istream& input_stream) +{ + std::array buffer{}; + input_stream.read(buffer.data(), sizeof(buffer)); + std::array constexpr expected_header{{31, 139, 8, 4}}; + CUDF_EXPECTS( + std::equal( + expected_header.begin(), expected_header.end(), reinterpret_cast(buffer.data())), + "malformed BGZIP header"); + // we ignore the remaining bytes of the fixed header, since they don't matter to us + auto const extra_length = read_int(&buffer[10]); + uint16_t extra_offset{}; + // read all the extra subfields + while (extra_offset < extra_length) { + auto const remaining_size = extra_length - extra_offset; + CUDF_EXPECTS(remaining_size >= 4, "invalid extra field length"); + // a subfield consists of 2 identifier bytes and a uint16 length + // 66/67 identifies a BGZIP block size field, we skip all other fields + input_stream.read(buffer.data(), 4); + extra_offset += 4; + auto const subfield_size = read_int(&buffer[2]); + if (buffer[0] == extra_blocklen_field_header[0] && + buffer[1] == extra_blocklen_field_header[1]) { + // the block size subfield contains a single uint16 value, which is block_size - 1 + CUDF_EXPECTS( + buffer[2] == extra_blocklen_field_header[2] && buffer[3] == extra_blocklen_field_header[3], + "malformed BGZIP extra subfield"); + input_stream.read(buffer.data(), sizeof(uint16_t)); + input_stream.seekg(remaining_size - 6, std::ios_base::cur); + auto const block_size_minus_one = read_int(&buffer[0]); + return {block_size_minus_one + 1, extra_length}; + } else { + input_stream.seekg(subfield_size, std::ios_base::cur); + extra_offset += subfield_size; + } + } + CUDF_FAIL("missing BGZIP size extra subfield"); +} + +footer read_footer(std::istream& input_stream) +{ + std::array buffer{}; + input_stream.read(buffer.data(), sizeof(buffer)); + return {read_int(&buffer[0]), read_int(&buffer[4])}; +} + +void write_footer(std::ostream& output_stream, host_span data) +{ + // compute crc32 with zlib, this allows checking the generated files with external tools + write_int(output_stream, crc32(0, (unsigned char*)data.data(), data.size())); + write_int(output_stream, data.size()); +} + +void write_header(std::ostream& output_stream, + uint16_t compressed_size, + host_span pre_size_subfield, + host_span post_size_subfield) +{ + std::array constexpr header_data{{ + 31, // magic number + 139, // magic number + 8, // compression type: deflate + 4, // flags: extra header + 0, // mtime + 0, // mtime + 0, // mtime + 0, // mtime: irrelevant + 4, // xfl: irrelevant + 3 // OS: irrelevant + }}; + output_stream.write(reinterpret_cast(header_data.data()), header_data.size()); + auto const extra_size = pre_size_subfield.size() + extra_blocklen_field_header.size() + + sizeof(uint16_t) + post_size_subfield.size(); + auto const block_size = + header_data.size() + sizeof(uint16_t) + extra_size + compressed_size + 2 * sizeof(uint32_t); + write_int(output_stream, extra_size); + output_stream.write(pre_size_subfield.data(), pre_size_subfield.size()); + output_stream.write(extra_blocklen_field_header.data(), extra_blocklen_field_header.size()); + CUDF_EXPECTS(block_size - 1 <= std::numeric_limits::max(), "block size overflow"); + write_int(output_stream, block_size - 1); + output_stream.write(post_size_subfield.data(), post_size_subfield.size()); +} + +void write_uncompressed_block(std::ostream& output_stream, + host_span data, + host_span pre_size_subfields, + host_span post_size_subfields) +{ + CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); + write_header(output_stream, data.size() + 5, pre_size_subfields, post_size_subfields); + write_int(output_stream, 1); + write_int(output_stream, data.size()); + write_int(output_stream, ~static_cast(data.size())); + output_stream.write(data.data(), data.size()); + write_footer(output_stream, data); +} + +void write_compressed_block(std::ostream& output_stream, + host_span data, + host_span pre_size_subfields, + host_span post_size_subfields) +{ + CUDF_EXPECTS(data.size() <= std::numeric_limits::max(), "data size overflow"); + z_stream deflate_stream{}; + // let's make sure we have enough space to store the data + std::vector compressed_out(data.size() * 2 + 256); + deflate_stream.next_in = reinterpret_cast(const_cast(data.data())); + deflate_stream.avail_in = data.size(); + deflate_stream.next_out = reinterpret_cast(compressed_out.data()); + deflate_stream.avail_out = compressed_out.size(); + CUDF_EXPECTS( + deflateInit2(&deflate_stream, // stream + Z_DEFAULT_COMPRESSION, // compression level + Z_DEFLATED, // method + -15, // log2 of window size (negative value means no ZLIB header/footer) + 9, // mem level: best performance/most memory usage for compression + Z_DEFAULT_STRATEGY // strategy + ) == Z_OK, + "deflateInit failed"); + CUDF_EXPECTS(deflate(&deflate_stream, Z_FINISH) == Z_STREAM_END, "deflate failed"); + CUDF_EXPECTS(deflateEnd(&deflate_stream) == Z_OK, "deflateEnd failed"); + write_header(output_stream, deflate_stream.total_out, pre_size_subfields, post_size_subfields); + output_stream.write(compressed_out.data(), deflate_stream.total_out); + write_footer(output_stream, data); +} + +} // namespace cudf::io::text::detail::bgzip diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e630e842f4e..8675dc891c1 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -230,6 +230,7 @@ ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) ConfigureTest(DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp) +target_link_libraries(DATA_CHUNK_SOURCE_TEST PRIVATE ZLIB::ZLIB) ConfigureTest(LOGICAL_STACK_TEST io/fst/logical_stack_test.cu) ConfigureTest(FST_TEST io/fst/fst_test.cu) ConfigureTest(TYPE_INFERENCE_TEST io/type_inference_test.cu) diff --git a/cpp/tests/io/text/data_chunk_source_test.cpp b/cpp/tests/io/text/data_chunk_source_test.cpp index 115a66cdd95..7cb75aea8e2 100644 --- a/cpp/tests/io/text/data_chunk_source_test.cpp +++ b/cpp/tests/io/text/data_chunk_source_test.cpp @@ -18,6 +18,7 @@ #include #include +#include #include @@ -125,102 +126,67 @@ TEST_F(DataChunkSourceTest, Host) test_source(content, *source); } -template -void write_int(std::ostream& stream, T val) -{ - std::array bytes; - // we assume little-endian - std::memcpy(&bytes[0], &val, sizeof(T)); - stream.write(bytes.data(), bytes.size()); -} +enum class compression { ENABLED, DISABLED }; -void write_bgzip_block(std::ostream& stream, - const std::string& data, - bool add_extra_garbage_before, - bool add_extra_garbage_after) -{ - std::array const header{{ - 31, // magic number - 139, // magic number - 8, // compression type: deflate - 4, // flags: extra header - 0, // mtime - 0, // mtime - 0, // mtime - 0, // mtime: irrelevant - 4, // xfl: irrelevant - 3 // OS: irrelevant - }}; - std::array const extra_blocklen_field{{66, 67, 2, 0}}; - std::array const extra_garbage_field1{{13, // magic number - 37, // magic number - 7, // field length - 0, // field length - 1, - 2, - 3, - 4, - 5, - 6, - 7}}; - std::array const extra_garbage_field2{{12, // magic number - 34, // magic number - 2, // field length - 0, // field length - 1, 2, - 56, // magic number - 78, // magic number - 1, // field length - 0, // field length - 3, // - 90, // magic number - 12, // magic number - 8, // field length - 0, // field length - 1, 2, 3, 4, 5, 6, 7, 8}}; - stream.write(reinterpret_cast(header.data()), header.size()); - uint16_t extra_size = extra_blocklen_field.size() + 2; - if (add_extra_garbage_before) { extra_size += extra_garbage_field1.size(); } - if (add_extra_garbage_after) { extra_size += extra_garbage_field2.size(); } - write_int(stream, extra_size); - if (add_extra_garbage_before) { - stream.write(extra_garbage_field1.data(), extra_garbage_field1.size()); - } - stream.write(extra_blocklen_field.data(), extra_blocklen_field.size()); - auto const compressed_size = data.size() + 5; - uint16_t const block_size_minus_one = compressed_size + 19 + extra_size; - write_int(stream, block_size_minus_one); - if (add_extra_garbage_after) { - stream.write(extra_garbage_field2.data(), extra_garbage_field2.size()); - } - write_int(stream, 1); - write_int(stream, data.size()); - write_int(stream, ~static_cast(data.size())); - stream.write(data.data(), data.size()); - // this does not produce a valid file, since we write 0 as the CRC - // the parser ignores the checksum, so it doesn't matter to the test - // to check output with gzip, plug in the CRC of `data` here. - write_int(stream, 0); - write_int(stream, data.size()); -} +enum class eof { ADD_EOF_BLOCK, NO_EOF_BLOCK }; -void write_bgzip(std::ostream& stream, - const std::string& data, +void write_bgzip(std::ostream& output_stream, + cudf::host_span data, std::default_random_engine& rng, - bool write_eof = true) + compression compress, + eof add_eof) { + std::vector const extra_garbage_fields1{{13, // magic number + 37, // magic number + 7, // field length + 0, // field length + 1, + 2, + 3, + 4, + 5, + 6, + 7}}; + std::vector const extra_garbage_fields2{{12, // magic number + 34, // magic number + 2, // field length + 0, // field length + 1, 2, + 56, // magic number + 78, // magic number + 1, // field length + 0, // field length + 3, // + 90, // magic number + 12, // magic number + 8, // field length + 0, // field length + 1, 2, 3, 4, 5, 6, 7, 8}}; // make sure the block size with header stays below 65536 std::uniform_int_distribution block_size_dist{1, 65000}; auto begin = data.begin(); auto const end = data.end(); int i = 0; while (begin < end) { + using cudf::host_span; auto len = std::min(end - begin, block_size_dist(rng)); - write_bgzip_block(stream, std::string{begin, begin + len}, i & 1, i & 2); + host_span const garbage_before = + i & 1 ? extra_garbage_fields1 : host_span{}; + host_span const garbage_after = + i & 2 ? extra_garbage_fields2 : host_span{}; + if (compress == compression::ENABLED) { + cudf::io::text::detail::bgzip::write_compressed_block( + output_stream, {begin, len}, garbage_before, garbage_after); + } else { + cudf::io::text::detail::bgzip::write_uncompressed_block( + output_stream, {begin, len}, garbage_before, garbage_after); + } begin += len; i++; } - if (write_eof) { write_bgzip_block(stream, {}, false, false); } + if (add_eof == eof::ADD_EOF_BLOCK) { + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); + } } TEST_F(DataChunkSourceTest, BgzipSource) @@ -231,9 +197,9 @@ TEST_F(DataChunkSourceTest, BgzipSource) input = input + input; } { - std::ofstream stream{filename}; + std::ofstream output_stream{filename}; std::default_random_engine rng{}; - write_bgzip(stream, input, rng); + write_bgzip(output_stream, input, rng, compression::DISABLED, eof::ADD_EOF_BLOCK); } auto const source = cudf::io::text::make_source_from_bgzip_file(filename); @@ -243,7 +209,7 @@ TEST_F(DataChunkSourceTest, BgzipSource) TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) { - auto const filename = temp_env->get_temp_filepath("bgzip_source"); + auto const filename = temp_env->get_temp_filepath("bgzip_source_offsets"); std::string input{"bananarama"}; for (int i = 0; i < 24; i++) { input = input + input; @@ -260,16 +226,18 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) std::size_t const begin_local_offset{data_garbage.size()}; std::size_t const end_local_offset{endinput.size()}; { - std::ofstream stream{filename}; - stream.write(padding_garbage.data(), padding_garbage.size()); + std::ofstream output_stream{filename}; + output_stream.write(padding_garbage.data(), padding_garbage.size()); std::default_random_engine rng{}; - begin_compressed_offset = stream.tellp(); - write_bgzip_block(stream, data_garbage + begininput, false, false); - write_bgzip(stream, input, rng, false); - end_compressed_offset = stream.tellp(); - write_bgzip_block(stream, endinput + data_garbage + data_garbage, false, false); - write_bgzip_block(stream, {}, false, false); - stream.write(padding_garbage.data(), padding_garbage.size()); + begin_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, + data_garbage + begininput); + write_bgzip(output_stream, input, rng, compression::DISABLED, eof::NO_EOF_BLOCK); + end_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, + endinput + data_garbage + data_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); + output_stream.write(padding_garbage.data(), padding_garbage.size()); } input = begininput + input + endinput; @@ -283,7 +251,7 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsets) TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) { - auto const filename = temp_env->get_temp_filepath("bgzip_source"); + auto const filename = temp_env->get_temp_filepath("bgzip_source_offsets_single_block"); std::string const input{"collection unit brings"}; std::string const head_garbage{"garbage"}; std::string const tail_garbage{"GARBAGE"}; @@ -292,9 +260,10 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) std::size_t const begin_local_offset{head_garbage.size()}; std::size_t const end_local_offset{head_garbage.size() + input.size()}; { - std::ofstream stream{filename}; - write_bgzip_block(stream, head_garbage + input + tail_garbage, false, false); - write_bgzip_block(stream, {}, false, false); + std::ofstream output_stream{filename}; + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, + head_garbage + input + tail_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } auto const source = @@ -307,7 +276,7 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleGZipBlock) TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) { - auto const filename = temp_env->get_temp_filepath("bgzip_source"); + auto const filename = temp_env->get_temp_filepath("bgzip_source_offsets_single_chunk"); std::string const input{"collection unit brings"}; std::string const head_garbage{"garbage"}; std::string const tail_garbage{"GARBAGE"}; @@ -316,11 +285,13 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) std::size_t const begin_local_offset{head_garbage.size()}; std::size_t const end_local_offset{input.size() - 10}; { - std::ofstream stream{filename}; - write_bgzip_block(stream, head_garbage + input.substr(0, 10), false, false); - end_compressed_offset = stream.tellp(); - write_bgzip_block(stream, input.substr(10) + tail_garbage, false, false); - write_bgzip_block(stream, {}, false, false); + std::ofstream output_stream{filename}; + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, + head_garbage + input.substr(0, 10)); + end_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, + input.substr(10) + tail_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); } auto const source = @@ -331,4 +302,44 @@ TEST_F(DataChunkSourceTest, BgzipSourceVirtualOffsetsSingleChunk) test_source(input, *source); } +TEST_F(DataChunkSourceTest, BgzipCompressedSourceVirtualOffsets) +{ + auto const filename = temp_env->get_temp_filepath("bgzip_source_compressed_offsets"); + std::string input{"bananarama"}; + for (int i = 0; i < 24; i++) { + input = input + input; + } + std::string padding_garbage{"garbage"}; + for (int i = 0; i < 10; i++) { + padding_garbage = padding_garbage + padding_garbage; + } + std::string const data_garbage{"GARBAGE"}; + std::string const begininput{"begin of bananarama"}; + std::string const endinput{"end of bananarama"}; + std::size_t begin_compressed_offset{}; + std::size_t end_compressed_offset{}; + std::size_t const begin_local_offset{data_garbage.size()}; + std::size_t const end_local_offset{endinput.size()}; + { + std::ofstream output_stream{filename}; + output_stream.write(padding_garbage.data(), padding_garbage.size()); + std::default_random_engine rng{}; + begin_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_compressed_block(output_stream, data_garbage + begininput); + write_bgzip(output_stream, input, rng, compression::ENABLED, eof::NO_EOF_BLOCK); + end_compressed_offset = output_stream.tellp(); + cudf::io::text::detail::bgzip::write_compressed_block(output_stream, + endinput + data_garbage + data_garbage); + cudf::io::text::detail::bgzip::write_uncompressed_block(output_stream, {}); + output_stream.write(padding_garbage.data(), padding_garbage.size()); + } + input = begininput + input + endinput; + + auto source = + cudf::io::text::make_source_from_bgzip_file(filename, + begin_compressed_offset << 16 | begin_local_offset, + end_compressed_offset << 16 | end_local_offset); + test_source(input, *source); +} + CUDF_TEST_PROGRAM_MAIN()