From cb06c20c36b2338915ed38a4c37d4db9a5bd3d79 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Tue, 31 Oct 2023 11:04:02 -0700 Subject: [PATCH] Sort dictionary data alphabetically in the ORC writer (#14295) Strings in the dictionary data streams are now sorted alphabetically. Reduces file size in some cases because compression can be more efficient. Reduces throughput up to 22% when writing strings columns (3% speedup when dictionary encoding is not used, though!). Benchmark data does not demonstrate the compression difference, but we have some user data that compresses almost 30% better. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - David Wendt (https://github.com/davidwendt) - Divye Gala (https://github.com/divyegala) - Alessandro Bellina (https://github.com/abellina) URL: https://github.com/rapidsai/cudf/pull/14295 --- cpp/include/cudf/io/orc.hpp | 56 ++++++++++++++++ cpp/src/io/orc/orc_gpu.hpp | 14 ++-- cpp/src/io/orc/stripe_enc.cu | 4 ++ cpp/src/io/orc/writer_impl.cu | 113 +++++++++++++++++++++++++++------ cpp/src/io/orc/writer_impl.hpp | 1 + cpp/tests/io/orc_test.cpp | 30 +++++++++ 6 files changed, 191 insertions(+), 27 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 5801d2c1008..c2762b05aa6 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -450,6 +450,8 @@ class orc_writer_options { std::map _user_data; // Optional compression statistics std::shared_ptr _compression_stats; + // Specify whether string dictionaries should be alphabetically sorted + bool _enable_dictionary_sort = true; friend orc_writer_options_builder; @@ -572,6 +574,13 @@ class orc_writer_options { return _compression_stats; } + /** + * @brief Returns whether string dictionaries should be sorted. + * + * @return `true` if string dictionaries should be sorted + */ + [[nodiscard]] bool get_enable_dictionary_sort() const { return _enable_dictionary_sort; } + // Setters /** @@ -670,6 +679,13 @@ class orc_writer_options { { _compression_stats = std::move(comp_stats); } + + /** + * @brief Sets whether string dictionaries should be sorted. + * + * @param val Boolean value to enable/disable + */ + void set_enable_dictionary_sort(bool val) { _enable_dictionary_sort = val; } }; /** @@ -810,6 +826,18 @@ class orc_writer_options_builder { return *this; } + /** + * @brief Sets whether string dictionaries should be sorted. + * + * @param val Boolean value to enable/disable + * @return this for chaining + */ + orc_writer_options_builder& enable_dictionary_sort(bool val) + { + options._enable_dictionary_sort = val; + return *this; + } + /** * @brief move orc_writer_options member once it's built. */ @@ -866,6 +894,8 @@ class chunked_orc_writer_options { std::map _user_data; // Optional compression statistics std::shared_ptr _compression_stats; + // Specify whether string dictionaries should be alphabetically sorted + bool _enable_dictionary_sort = true; friend chunked_orc_writer_options_builder; @@ -966,6 +996,13 @@ class chunked_orc_writer_options { return _compression_stats; } + /** + * @brief Returns whether string dictionaries should be sorted. + * + * @return `true` if string dictionaries should be sorted + */ + [[nodiscard]] bool get_enable_dictionary_sort() const { return _enable_dictionary_sort; } + // Setters /** @@ -1057,6 +1094,13 @@ class chunked_orc_writer_options { { _compression_stats = std::move(comp_stats); } + + /** + * @brief Sets whether string dictionaries should be sorted. + * + * @param val Boolean value to enable/disable + */ + void set_enable_dictionary_sort(bool val) { _enable_dictionary_sort = val; } }; /** @@ -1183,6 +1227,18 @@ class chunked_orc_writer_options_builder { return *this; } + /** + * @brief Sets whether string dictionaries should be sorted. + * + * @param val Boolean value to enable/disable + * @return this for chaining + */ + chunked_orc_writer_options_builder& enable_dictionary_sort(bool val) + { + options._enable_dictionary_sort = val; + return *this; + } + /** * @brief move chunked_orc_writer_options member once it's built. */ diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 5669a20907d..243704b65d4 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -150,7 +150,8 @@ struct EncChunk { uint8_t dtype_len; // data type length int32_t scale; // scale for decimals or timestamps - uint32_t* dict_index; // dictionary index from row index + uint32_t* dict_index; // dictionary index from row index + uint32_t* dict_data_order; // map from data to sorted data indices uint32_t* decimal_offsets; orc_column_device_view const* column; }; @@ -191,11 +192,12 @@ struct stripe_dictionary { size_type num_rows = 0; // number of rows in the stripe // output - device_span data; // index of elements in the column to include in the dictionary - device_span index; // index into the dictionary for each row in the column - size_type entry_count = 0; // number of entries in the dictionary - size_type char_count = 0; // number of characters in the dictionary - bool is_enabled = false; // true if dictionary encoding is enabled for this stripe + device_span data; // index of elements in the column to include in the dictionary + device_span index; // index into the dictionary for each row in the column + device_span data_order; // map from data to sorted data indices + size_type entry_count = 0; // number of entries in the dictionary + size_type char_count = 0; // number of characters in the dictionary + bool is_enabled = false; // true if dictionary encoding is enabled for this stripe }; /** diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 5c75ba22159..b99826e070e 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -837,6 +837,10 @@ __global__ void __launch_bounds__(block_size) if (dict_idx > 0x7fff'ffffu) { dict_idx = s->chunk.dict_index[dict_idx & 0x7fff'ffffu]; } + // translate dictionary index to sorted order, if enabled + if (s->chunk.dict_data_order != nullptr) { + dict_idx = s->chunk.dict_data_order[dict_idx]; + } s->vals.u32[nz_idx] = dict_idx; } else { string_view value = column.element(row); diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 81629e03a82..ac5993e764e 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -50,6 +51,8 @@ #include #include #include +#include +#include #include #include @@ -867,16 +870,15 @@ encoded_data encode_columns(orc_table_view const& orc_table, ck.null_mask_num_rows = aligned_rowgroups[rg_idx][column.index()].size(); ck.encoding_kind = column.orc_encoding(); ck.type_kind = column.orc_kind(); - if (ck.type_kind == TypeKind::STRING) { - ck.dict_index = (ck.encoding_kind == DICTIONARY_V2) - ? column.host_stripe_dict(stripe.id).index.data() - : nullptr; - ck.dtype_len = 1; - } else { - ck.dtype_len = column.type_width(); - } - ck.scale = column.scale(); - if (ck.type_kind == TypeKind::DECIMAL) { ck.decimal_offsets = column.decimal_offsets(); } + auto const is_str_dict = + ck.type_kind == TypeKind::STRING and ck.encoding_kind == DICTIONARY_V2; + ck.dict_index = is_str_dict ? column.host_stripe_dict(stripe.id).index.data() : nullptr; + ck.dict_data_order = + is_str_dict ? column.host_stripe_dict(stripe.id).data_order.data() : nullptr; + ck.dtype_len = (ck.type_kind == TypeKind::STRING) ? 1 : column.type_width(); + ck.scale = column.scale(); + ck.decimal_offsets = + (ck.type_kind == TypeKind::DECIMAL) ? column.decimal_offsets() : nullptr; } } } @@ -2012,24 +2014,41 @@ struct stripe_dictionaries { hostdevice_2dvector views; // descriptors [string_column][stripe] std::vector> data_owner; // dictionary data owner, per stripe std::vector> index_owner; // dictionary index owner, per stripe + std::vector> order_owner; // dictionary order owner, per stripe // Should be called after encoding is complete to deallocate the dictionary buffers. void on_encode_complete(rmm::cuda_stream_view stream) { data_owner.clear(); index_owner.clear(); + order_owner.clear(); for (auto& sd : views.host_view().flat_view()) { - sd.data = {}; - sd.index = {}; + sd.data = {}; + sd.index = {}; + sd.data_order = {}; } views.host_to_device_async(stream); } }; +/** + * @brief Compares two rows in a strings column + */ +struct string_rows_less { + device_span cols; + uint32_t col_idx; + __device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const + { + auto const& col = cols[col_idx]; + return col.element(lhs_idx) < col.element(rhs_idx); + } +}; + // Build stripe dictionaries for string columns stripe_dictionaries build_dictionaries(orc_table_view& orc_table, file_segmentation const& segmentation, + bool sort_dictionaries, rmm::cuda_stream_view stream) { std::vector>> hash_maps_storage( @@ -2080,6 +2099,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, // Data owners; can be cleared after encode std::vector> dict_data_owner; std::vector> dict_index_owner; + std::vector> dict_order_owner; // Make decision about which stripes to encode with dictionary encoding for (auto col_idx : orc_table.string_column_indices) { auto& str_column = orc_table.column(col_idx); @@ -2122,15 +2142,61 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, gpu::collect_map_entries(stripe_dicts, stream); gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); - // Clear map slots; hash map storage is deallocated at the end of this function - auto device_dicts_flat = stripe_dicts.device_view().flat_view(); - thrust::for_each(rmm::exec_policy(stream), - device_dicts_flat.begin(), - device_dicts_flat.end(), - [] __device__(auto& sd) { sd.map_slots = {}; }); - stripe_dicts.device_to_host_async(stream); + // deallocate hash map storage, unused after this point + hash_maps_storage.clear(); + + // Clear map slots and attach order buffers + auto dictionaries_flat = stripe_dicts.host_view().flat_view(); + for (auto& sd : dictionaries_flat) { + if (not sd.is_enabled) { continue; } + + sd.map_slots = {}; + if (sort_dictionaries) { + dict_order_owner.emplace_back(sd.entry_count, stream); + sd.data_order = dict_order_owner.back(); + } else { + sd.data_order = {}; + } + } + stripe_dicts.host_to_device_async(stream); + + // Sort stripe dictionaries alphabetically + if (sort_dictionaries) { + auto streams = cudf::detail::fork_streams(stream, std::min(dict_order_owner.size(), 8)); + auto stream_idx = 0; + for (auto& sd : dictionaries_flat) { + if (not sd.is_enabled) { continue; } + + auto const& current_stream = streams[stream_idx]; + + // Sort the dictionary data and create a mapping from the sorted order to the original + thrust::sequence( + rmm::exec_policy_nosync(current_stream), sd.data_order.begin(), sd.data_order.end()); + thrust::sort_by_key(rmm::exec_policy_nosync(current_stream), + sd.data.begin(), + sd.data.end(), + sd.data_order.begin(), + string_rows_less{orc_table.d_columns, sd.column_idx}); + + // Create the inverse permutation - i.e. the mapping from the original order to the sorted + auto order_copy = cudf::detail::make_device_uvector_async( + sd.data_order, current_stream, rmm::mr::get_current_device_resource()); + thrust::scatter(rmm::exec_policy_nosync(current_stream), + thrust::counting_iterator(0), + thrust::counting_iterator(sd.data_order.size()), + order_copy.begin(), + sd.data_order.begin()); + + stream_idx = (stream_idx + 1) % streams.size(); + } + + cudf::detail::join_streams(streams, stream); + } - return {std::move(stripe_dicts), std::move(dict_data_owner), std::move(dict_index_owner)}; + return {std::move(stripe_dicts), + std::move(dict_data_owner), + std::move(dict_index_owner), + std::move(dict_order_owner)}; } /** @@ -2142,6 +2208,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, * @param max_stripe_size Maximum size of stripes in the output file * @param row_index_stride The row index stride * @param enable_dictionary Whether dictionary is enabled + * @param sort_dictionaries Whether to sort the dictionaries * @param compression_kind The compression kind * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers @@ -2156,6 +2223,7 @@ auto convert_table_to_orc_data(table_view const& input, stripe_size_limits max_stripe_size, size_type row_index_stride, bool enable_dictionary, + bool sort_dictionaries, CompressionKind compression_kind, size_t compression_blocksize, statistics_freq stats_freq, @@ -2180,7 +2248,7 @@ auto convert_table_to_orc_data(table_view const& input, auto segmentation = calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size); - auto stripe_dicts = build_dictionaries(orc_table, segmentation, stream); + auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); @@ -2314,6 +2382,7 @@ writer::impl::impl(std::unique_ptr sink, _compression_blocksize(compression_block_size(_compression_kind)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), + _sort_dictionaries{options.get_enable_dictionary_sort()}, _single_write_mode(mode), _kv_meta(options.get_key_value_metadata()), _out_sink(std::move(sink)) @@ -2335,6 +2404,7 @@ writer::impl::impl(std::unique_ptr sink, _compression_blocksize(compression_block_size(_compression_kind)), _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), + _sort_dictionaries{options.get_enable_dictionary_sort()}, _single_write_mode(mode), _kv_meta(options.get_key_value_metadata()), _out_sink(std::move(sink)) @@ -2382,6 +2452,7 @@ void writer::impl::write(table_view const& input) _max_stripe_size, _row_index_stride, _enable_dictionary, + _sort_dictionaries, _compression_kind, _compression_blocksize, _stats_freq, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 67c65eb9a37..0d1a83f3d85 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -346,6 +346,7 @@ class writer::impl { size_t const _compression_blocksize; std::shared_ptr _compression_statistics; // Optional output statistics_freq const _stats_freq; + bool const _sort_dictionaries; single_write_mode const _single_write_mode; // Special parameter only used by `write()` to // indicate that we are guaranteeing a single table // write. This enables some internal optimizations. diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 3457c5675ad..234716749ff 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1930,4 +1930,34 @@ TEST_F(OrcStatisticsTest, AllNulls) check_all_null_stats(stats.file_stats[3]); } +TEST_F(OrcWriterTest, UnorderedDictionary) +{ + std::vector strings{ + "BBBB", "BBBB", "CCCC", "BBBB", "CCCC", "EEEE", "CCCC", "AAAA", "DDDD", "EEEE"}; + str_col col(strings.begin(), strings.end()); + + table_view expected({col}); + + std::vector out_buffer_sorted; + cudf::io::orc_writer_options out_opts_sorted = + cudf::io::orc_writer_options::builder(cudf::io::sink_info{&out_buffer_sorted}, expected); + cudf::io::write_orc(out_opts_sorted); + + cudf::io::orc_reader_options in_opts_sorted = cudf::io::orc_reader_options::builder( + cudf::io::source_info{out_buffer_sorted.data(), out_buffer_sorted.size()}); + auto const from_sorted = cudf::io::read_orc(in_opts_sorted).tbl; + + std::vector out_buffer_unsorted; + cudf::io::orc_writer_options out_opts_unsorted = + cudf::io::orc_writer_options::builder(cudf::io::sink_info{&out_buffer_unsorted}, expected) + .enable_dictionary_sort(false); + cudf::io::write_orc(out_opts_unsorted); + + cudf::io::orc_reader_options in_opts_unsorted = cudf::io::orc_reader_options::builder( + cudf::io::source_info{out_buffer_unsorted.data(), out_buffer_unsorted.size()}); + auto const from_unsorted = cudf::io::read_orc(in_opts_unsorted).tbl; + + CUDF_TEST_EXPECT_TABLES_EQUAL(*from_sorted, *from_unsorted); +} + CUDF_TEST_PROGRAM_MAIN()