From 1f8da7913309a45bcab4e2046c388f080510dfc3 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 30 Nov 2021 22:14:53 -0800 Subject: [PATCH 1/5] skip some parts of writing if no rows; test --- cpp/src/io/orc/writer_impl.cu | 339 +++++++++++---------- cpp/src/io/utilities/hostdevice_vector.hpp | 1 + python/cudf/cudf/tests/test_orc.py | 10 + 3 files changed, 184 insertions(+), 166 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 9e493c192e4..caa0d64822c 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -579,12 +579,15 @@ orc_streams writer::impl::create_streams(host_span columns, } auto const direct_data_size = - std::accumulate(segmentation.stripes.front().cbegin(), - segmentation.stripes.back().cend(), - size_t{0}, - [&](auto data_size, auto rg_idx) { - return data_size + column.host_dict_chunk(rg_idx)->string_char_count; - }); + segmentation.num_stripes() == 0 + ? 0ul + : std::accumulate(segmentation.stripes.front().cbegin(), + segmentation.stripes.back().cend(), + size_t{0}, + [&](auto data_size, auto rg_idx) { + return data_size + + column.host_dict_chunk(rg_idx)->string_char_count; + }); if (enable_dict) { uint32_t dict_bits = 0; for (dict_bits = 1; dict_bits < 32; dict_bits <<= 1) { @@ -988,17 +991,19 @@ encoded_data encode_columns(orc_table_view const& orc_table, } chunk_streams.host_to_device(stream); - if (orc_table.num_string_columns() != 0) { - auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict(); - gpu::EncodeStripeDictionaries(d_stripe_dict, - chunks, - orc_table.num_string_columns(), - segmentation.num_stripes(), - chunk_streams, - stream); - } + if (orc_table.num_rows() > 0) { + if (orc_table.num_string_columns() != 0) { + auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict(); + gpu::EncodeStripeDictionaries(d_stripe_dict, + chunks, + orc_table.num_string_columns(), + segmentation.num_stripes(), + chunk_streams, + stream); + } - gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); + gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); + } dictionaries.data.clear(); dictionaries.index.clear(); stream.synchronize(); @@ -1803,7 +1808,7 @@ void writer::impl::write(table_view const& table) auto dictionaries = allocate_dictionaries(orc_table, rowgroup_bounds, stream); hostdevice_2dvector dict( rowgroup_bounds.size().first, orc_table.num_string_columns(), stream); - if (orc_table.num_string_columns() != 0) { + if (not dict.is_empty()) { init_dictionaries(orc_table, rowgroup_bounds, dictionaries.d_data_view, @@ -1819,7 +1824,7 @@ void writer::impl::write(table_view const& table) // Build stripe-level dictionaries hostdevice_2dvector stripe_dict( segmentation.num_stripes(), orc_table.num_string_columns(), stream); - if (orc_table.num_string_columns() != 0) { + if (not stripe_dict.is_empty()) { build_dictionaries(orc_table, segmentation.stripes, dict, @@ -1842,165 +1847,167 @@ void writer::impl::write(table_view const& table) segmentation.num_stripes(), num_data_streams, stream); auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs); - // Gather column statistics - std::vector column_stats; - if (enable_statistics_ && table.num_columns() > 0 && num_rows > 0) { - column_stats = gather_statistic_blobs(orc_table, segmentation); - } - - // Allocate intermediate output stream buffer - size_t compressed_bfr_size = 0; - size_t num_compressed_blocks = 0; - size_t max_compressed_block_size = 0; - if (compression_kind_ != NONE) { - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size); - } - auto stream_output = [&]() { - size_t max_stream_size = 0; - bool all_device_write = true; - - for (auto& ss : strm_descs.host_view().flat_view()) { - if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } - size_t stream_size = ss.stream_size; - if (compression_kind_ != NONE) { - ss.first_block = num_compressed_blocks; - ss.bfr_offset = compressed_bfr_size; - - auto num_blocks = std::max( - (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); - stream_size += num_blocks * BLOCK_HEADER_SIZE; - num_compressed_blocks += num_blocks; - compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks; - } - max_stream_size = std::max(max_stream_size, stream_size); + if (num_rows > 0) { + // Gather column statistics + std::vector column_stats; + if (enable_statistics_ && table.num_columns() > 0) { + column_stats = gather_statistic_blobs(orc_table, segmentation); } - if (all_device_write) { - return pinned_buffer{nullptr, cudaFreeHost}; - } else { - return pinned_buffer{[](size_t size) { - uint8_t* ptr = nullptr; - CUDA_TRY(cudaMallocHost(&ptr, size)); - return ptr; - }(max_stream_size), - cudaFreeHost}; + // Allocate intermediate output stream buffer + size_t compressed_bfr_size = 0; + size_t num_compressed_blocks = 0; + size_t max_compressed_block_size = 0; + if (compression_kind_ != NONE) { + nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size); } - }(); - - // Compress the data streams - rmm::device_buffer compressed_data(compressed_bfr_size, stream); - hostdevice_vector comp_out(num_compressed_blocks, stream); - hostdevice_vector comp_in(num_compressed_blocks, stream); - if (compression_kind_ != NONE) { - strm_descs.host_to_device(stream); - gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), - num_compressed_blocks, - compression_kind_, - compression_blocksize_, - max_compressed_block_size, - strm_descs, - enc_data.streams, - comp_in, - comp_out, - stream); - strm_descs.device_to_host(stream); - comp_out.device_to_host(stream, true); - } + auto stream_output = [&]() { + size_t max_stream_size = 0; + bool all_device_write = true; + + for (auto& ss : strm_descs.host_view().flat_view()) { + if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } + size_t stream_size = ss.stream_size; + if (compression_kind_ != NONE) { + ss.first_block = num_compressed_blocks; + ss.bfr_offset = compressed_bfr_size; + + auto num_blocks = std::max( + (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); + stream_size += num_blocks * BLOCK_HEADER_SIZE; + num_compressed_blocks += num_blocks; + compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks; + } + max_stream_size = std::max(max_stream_size, stream_size); + } - ProtobufWriter pbw_(&buffer_); + if (all_device_write) { + return pinned_buffer{nullptr, cudaFreeHost}; + } else { + return pinned_buffer{[](size_t size) { + uint8_t* ptr = nullptr; + CUDA_TRY(cudaMallocHost(&ptr, size)); + return ptr; + }(max_stream_size), + cudaFreeHost}; + } + }(); - // Write stripes - std::vector> write_tasks; - for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { - auto const& rowgroups_range = segmentation.stripes[stripe_id]; - auto& stripe = stripes[stripe_id]; - - stripe.offset = out_sink_->bytes_written(); - - // Column (skippable) index streams appear at the start of the stripe - for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) { - write_index_stream(stripe_id, - stream_id, - orc_table.columns, - rowgroups_range, - enc_data.streams, - strm_descs, - comp_out, - &stripe, - &streams, - &pbw_); + // Compress the data streams + rmm::device_buffer compressed_data(compressed_bfr_size, stream); + hostdevice_vector comp_out(num_compressed_blocks, stream); + hostdevice_vector comp_in(num_compressed_blocks, stream); + if (compression_kind_ != NONE) { + strm_descs.host_to_device(stream); + gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), + num_compressed_blocks, + compression_kind_, + compression_blocksize_, + max_compressed_block_size, + strm_descs, + enc_data.streams, + comp_in, + comp_out, + stream); + strm_descs.device_to_host(stream); + comp_out.device_to_host(stream, true); } - // Column data consisting one or more separate streams - for (auto const& strm_desc : strm_descs[stripe_id]) { - write_tasks.push_back( - write_data_stream(strm_desc, - enc_data.streams[strm_desc.column_id][rowgroups_range.first], - static_cast(compressed_data.data()), - stream_output.get(), - &stripe, - &streams)); - } + ProtobufWriter pbw_(&buffer_); + + // Write stripes + std::vector> write_tasks; + for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { + auto const& rowgroups_range = segmentation.stripes[stripe_id]; + auto& stripe = stripes[stripe_id]; + + stripe.offset = out_sink_->bytes_written(); + + // Column (skippable) index streams appear at the start of the stripe + for (size_type stream_id = 0; stream_id < num_index_streams; ++stream_id) { + write_index_stream(stripe_id, + stream_id, + orc_table.columns, + rowgroups_range, + enc_data.streams, + strm_descs, + comp_out, + &stripe, + &streams, + &pbw_); + } + + // Column data consisting one or more separate streams + for (auto const& strm_desc : strm_descs[stripe_id]) { + write_tasks.push_back( + write_data_stream(strm_desc, + enc_data.streams[strm_desc.column_id][rowgroups_range.first], + static_cast(compressed_data.data()), + stream_output.get(), + &stripe, + &streams)); + } - // Write stripefooter consisting of stream information - StripeFooter sf; - sf.streams = streams; - sf.columns.resize(orc_table.num_columns() + 1); - sf.columns[0].kind = DIRECT; - for (size_t i = 1; i < sf.columns.size(); ++i) { - sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); - sf.columns[i].dictionarySize = - (sf.columns[i].kind == DICTIONARY_V2) - ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings - : 0; - if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } + // Write stripefooter consisting of stream information + StripeFooter sf; + sf.streams = streams; + sf.columns.resize(orc_table.num_columns() + 1); + sf.columns[0].kind = DIRECT; + for (size_t i = 1; i < sf.columns.size(); ++i) { + sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); + sf.columns[i].dictionarySize = + (sf.columns[i].kind == DICTIONARY_V2) + ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings + : 0; + if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } + } + buffer_.resize((compression_kind_ != NONE) ? 3 : 0); + pbw_.write(sf); + stripe.footerLength = buffer_.size(); + if (compression_kind_ != NONE) { + uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; + buffer_[0] = static_cast(uncomp_sf_len >> 0); + buffer_[1] = static_cast(uncomp_sf_len >> 8); + buffer_[2] = static_cast(uncomp_sf_len >> 16); + } + out_sink_->host_write(buffer_.data(), buffer_.size()); } - buffer_.resize((compression_kind_ != NONE) ? 3 : 0); - pbw_.write(sf); - stripe.footerLength = buffer_.size(); - if (compression_kind_ != NONE) { - uint32_t uncomp_sf_len = (stripe.footerLength - 3) * 2 + 1; - buffer_[0] = static_cast(uncomp_sf_len >> 0); - buffer_[1] = static_cast(uncomp_sf_len >> 8); - buffer_[2] = static_cast(uncomp_sf_len >> 16); + for (auto const& task : write_tasks) { + task.wait(); } - out_sink_->host_write(buffer_.data(), buffer_.size()); - } - for (auto const& task : write_tasks) { - task.wait(); - } - if (column_stats.size() != 0) { - // File-level statistics - // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls - if (single_write_mode) { - // First entry contains total number of rows - buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); - pbw_.put_uint(num_rows); - ff.statistics.reserve(1 + orc_table.num_columns()); - ff.statistics.emplace_back(std::move(buffer_)); - // Add file stats, stored after stripe stats in `column_stats` - ff.statistics.insert( - ff.statistics.end(), - std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(), - std::make_move_iterator(column_stats.end())); - } - // Stripe-level statistics - size_t first_stripe = md.stripeStats.size(); - md.stripeStats.resize(first_stripe + stripes.size()); - for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { - md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns()); - buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); - pbw_.put_uint(stripes[stripe_id].numberOfRows); - md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); - for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) { - size_t idx = stripes.size() * col_idx + stripe_id; - if (idx < column_stats.size()) { - md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = - std::move(column_stats[idx]); + if (column_stats.size() != 0) { + // File-level statistics + // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls + if (single_write_mode) { + // First entry contains total number of rows + buffer_.resize(0); + pbw_.putb(1 * 8 + PB_TYPE_VARINT); + pbw_.put_uint(num_rows); + ff.statistics.reserve(1 + orc_table.num_columns()); + ff.statistics.emplace_back(std::move(buffer_)); + // Add file stats, stored after stripe stats in `column_stats` + ff.statistics.insert( + ff.statistics.end(), + std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(), + std::make_move_iterator(column_stats.end())); + } + // Stripe-level statistics + size_t first_stripe = md.stripeStats.size(); + md.stripeStats.resize(first_stripe + stripes.size()); + for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { + md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns()); + buffer_.resize(0); + pbw_.putb(1 * 8 + PB_TYPE_VARINT); + pbw_.put_uint(stripes[stripe_id].numberOfRows); + md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); + for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) { + size_t idx = stripes.size() * col_idx + stripe_id; + if (idx < column_stats.size()) { + md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = + std::move(column_stats[idx]); + } } } } diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index 283715478a0..a7f9aec7bb4 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -179,6 +179,7 @@ class hostdevice_2dvector { auto size() const noexcept { return _size; } auto count() const noexcept { return _size.first * _size.second; } + auto is_empty() const noexcept { return count() == 0; } T* base_host_ptr(size_t offset = 0) { return _data.host_ptr(offset); } T* base_device_ptr(size_t offset = 0) { return _data.device_ptr(offset); } diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 99b5652110b..aa48dbc0edf 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1526,3 +1526,13 @@ def test_orc_writer_rle_stream_size(datadir, tmpdir): # Segfaults when RLE stream sizes don't account for varint length pa_out = pa.orc.ORCFile(reencoded).read() assert_eq(df.to_pandas(), pa_out) + + +def test_empty_columns(): + buffer = BytesIO() + expected = cudf.DataFrame({"string": []}, dtype=pd.StringDtype()) + expected.to_orc(buffer, compression="snappy") + + got_df = cudf.read_orc(buffer) + + assert_eq(expected, got_df) From 77ad4657c32a6855b7722d7e632317d5ce798d90 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 1 Dec 2021 11:32:43 -0800 Subject: [PATCH 2/5] expand test --- python/cudf/cudf/tests/test_orc.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index aa48dbc0edf..fe0bb6905cf 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1530,9 +1530,14 @@ def test_orc_writer_rle_stream_size(datadir, tmpdir): def test_empty_columns(): buffer = BytesIO() - expected = cudf.DataFrame({"string": []}, dtype=pd.StringDtype()) + # string and decimal columns have additional steps that need to be skipped + expected = cudf.DataFrame( + { + "string": cudf.Series([], dtype="str"), + "decimal": cudf.Series([], dtype=cudf.Decimal64Dtype(10, 1)), + } + ) expected.to_orc(buffer, compression="snappy") got_df = cudf.read_orc(buffer) - assert_eq(expected, got_df) From 950fc32b3da9d3c52273fb47ffb93b96fc1885ef Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 1 Dec 2021 12:39:10 -0800 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Conor Hoekstra <36027403+codereport@users.noreply.github.com> --- cpp/src/io/orc/writer_impl.cu | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index caa0d64822c..a945f036f81 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1849,9 +1849,9 @@ void writer::impl::write(table_view const& table) if (num_rows > 0) { // Gather column statistics - std::vector column_stats; - if (enable_statistics_ && table.num_columns() > 0) { - column_stats = gather_statistic_blobs(orc_table, segmentation); + auto column_stats = enable_statistics_ && table.num_columns() > 0 + ? gather_statistic_blobs(orc_table, segmentation) + : std::vector{}; } // Allocate intermediate output stream buffer @@ -1977,7 +1977,7 @@ void writer::impl::write(table_view const& table) task.wait(); } - if (column_stats.size() != 0) { + if (not column_stats.empty()) { // File-level statistics // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls if (single_write_mode) { From 6ac50f93cf8973a91d8b5be45e5a781bca415440 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 1 Dec 2021 15:38:40 -0800 Subject: [PATCH 4/5] fix --- cpp/src/io/orc/writer_impl.cu | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index a945f036f81..2eb251be1ab 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1849,10 +1849,9 @@ void writer::impl::write(table_view const& table) if (num_rows > 0) { // Gather column statistics - auto column_stats = enable_statistics_ && table.num_columns() > 0 - ? gather_statistic_blobs(orc_table, segmentation) - : std::vector{}; - } + auto const column_stats = enable_statistics_ && table.num_columns() > 0 + ? gather_statistic_blobs(orc_table, segmentation) + : std::vector{}; // Allocate intermediate output stream buffer size_t compressed_bfr_size = 0; From 33dc6f355835846076f6922e78a6579aa520e399 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 1 Dec 2021 15:52:44 -0800 Subject: [PATCH 5/5] remove integer-suffix --- cpp/src/io/orc/writer_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 2eb251be1ab..bf92af8ec75 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -580,7 +580,7 @@ orc_streams writer::impl::create_streams(host_span columns, auto const direct_data_size = segmentation.num_stripes() == 0 - ? 0ul + ? 0 : std::accumulate(segmentation.stripes.front().cbegin(), segmentation.stripes.back().cend(), size_t{0},