Skip to content

Commit

Permalink
Sort dictionary data alphabetically in the ORC writer (#14295)
Browse files Browse the repository at this point in the history
Strings in the dictionary data streams are now sorted alphabetically.
Reduces file size in some cases because compression can be more efficient.

Reduces throughput up to 22% when writing strings columns (3% speedup when dictionary encoding is not used, though!).
Benchmark data does not demonstrate the compression difference, but we have some user data that compresses almost 30% better.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Divye Gala (https://github.com/divyegala)
  - Alessandro Bellina (https://github.com/abellina)

URL: #14295
  • Loading branch information
vuule authored Oct 31, 2023
1 parent 7358ecd commit cb06c20
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 27 deletions.
56 changes: 56 additions & 0 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ class orc_writer_options {
std::map<std::string, std::string> _user_data;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// Specify whether string dictionaries should be alphabetically sorted
bool _enable_dictionary_sort = true;

friend orc_writer_options_builder;

Expand Down Expand Up @@ -572,6 +574,13 @@ class orc_writer_options {
return _compression_stats;
}

/**
* @brief Returns whether string dictionaries should be sorted.
*
* @return `true` if string dictionaries should be sorted
*/
[[nodiscard]] bool get_enable_dictionary_sort() const { return _enable_dictionary_sort; }

// Setters

/**
Expand Down Expand Up @@ -670,6 +679,13 @@ class orc_writer_options {
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
*/
void set_enable_dictionary_sort(bool val) { _enable_dictionary_sort = val; }
};

/**
Expand Down Expand Up @@ -810,6 +826,18 @@ class orc_writer_options_builder {
return *this;
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
* @return this for chaining
*/
orc_writer_options_builder& enable_dictionary_sort(bool val)
{
options._enable_dictionary_sort = val;
return *this;
}

/**
* @brief move orc_writer_options member once it's built.
*/
Expand Down Expand Up @@ -866,6 +894,8 @@ class chunked_orc_writer_options {
std::map<std::string, std::string> _user_data;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// Specify whether string dictionaries should be alphabetically sorted
bool _enable_dictionary_sort = true;

friend chunked_orc_writer_options_builder;

Expand Down Expand Up @@ -966,6 +996,13 @@ class chunked_orc_writer_options {
return _compression_stats;
}

/**
* @brief Returns whether string dictionaries should be sorted.
*
* @return `true` if string dictionaries should be sorted
*/
[[nodiscard]] bool get_enable_dictionary_sort() const { return _enable_dictionary_sort; }

// Setters

/**
Expand Down Expand Up @@ -1057,6 +1094,13 @@ class chunked_orc_writer_options {
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
*/
void set_enable_dictionary_sort(bool val) { _enable_dictionary_sort = val; }
};

/**
Expand Down Expand Up @@ -1183,6 +1227,18 @@ class chunked_orc_writer_options_builder {
return *this;
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
* @return this for chaining
*/
chunked_orc_writer_options_builder& enable_dictionary_sort(bool val)
{
options._enable_dictionary_sort = val;
return *this;
}

/**
* @brief move chunked_orc_writer_options member once it's built.
*/
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ struct EncChunk {
uint8_t dtype_len; // data type length
int32_t scale; // scale for decimals or timestamps

uint32_t* dict_index; // dictionary index from row index
uint32_t* dict_index; // dictionary index from row index
uint32_t* dict_data_order; // map from data to sorted data indices
uint32_t* decimal_offsets;
orc_column_device_view const* column;
};
Expand Down Expand Up @@ -191,11 +192,12 @@ struct stripe_dictionary {
size_type num_rows = 0; // number of rows in the stripe

// output
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
device_span<uint32_t> index; // index into the dictionary for each row in the column
size_type entry_count = 0; // number of entries in the dictionary
size_type char_count = 0; // number of characters in the dictionary
bool is_enabled = false; // true if dictionary encoding is enabled for this stripe
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
device_span<uint32_t> index; // index into the dictionary for each row in the column
device_span<uint32_t> data_order; // map from data to sorted data indices
size_type entry_count = 0; // number of entries in the dictionary
size_type char_count = 0; // number of characters in the dictionary
bool is_enabled = false; // true if dictionary encoding is enabled for this stripe
};

/**
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,10 @@ __global__ void __launch_bounds__(block_size)
if (dict_idx > 0x7fff'ffffu) {
dict_idx = s->chunk.dict_index[dict_idx & 0x7fff'ffffu];
}
// translate dictionary index to sorted order, if enabled
if (s->chunk.dict_data_order != nullptr) {
dict_idx = s->chunk.dict_data_order[dict_idx];
}
s->vals.u32[nz_idx] = dict_idx;
} else {
string_view value = column.element<string_view>(row);
Expand Down
113 changes: 92 additions & 21 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/pinned_host_vector.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/bit.hpp>
Expand All @@ -50,6 +51,8 @@
#include <thrust/pair.h>
#include <thrust/reduce.h>
#include <thrust/scan.h>
#include <thrust/sequence.h>
#include <thrust/sort.h>
#include <thrust/tabulate.h>
#include <thrust/transform.h>

Expand Down Expand Up @@ -867,16 +870,15 @@ encoded_data encode_columns(orc_table_view const& orc_table,
ck.null_mask_num_rows = aligned_rowgroups[rg_idx][column.index()].size();
ck.encoding_kind = column.orc_encoding();
ck.type_kind = column.orc_kind();
if (ck.type_kind == TypeKind::STRING) {
ck.dict_index = (ck.encoding_kind == DICTIONARY_V2)
? column.host_stripe_dict(stripe.id).index.data()
: nullptr;
ck.dtype_len = 1;
} else {
ck.dtype_len = column.type_width();
}
ck.scale = column.scale();
if (ck.type_kind == TypeKind::DECIMAL) { ck.decimal_offsets = column.decimal_offsets(); }
auto const is_str_dict =
ck.type_kind == TypeKind::STRING and ck.encoding_kind == DICTIONARY_V2;
ck.dict_index = is_str_dict ? column.host_stripe_dict(stripe.id).index.data() : nullptr;
ck.dict_data_order =
is_str_dict ? column.host_stripe_dict(stripe.id).data_order.data() : nullptr;
ck.dtype_len = (ck.type_kind == TypeKind::STRING) ? 1 : column.type_width();
ck.scale = column.scale();
ck.decimal_offsets =
(ck.type_kind == TypeKind::DECIMAL) ? column.decimal_offsets() : nullptr;
}
}
}
Expand Down Expand Up @@ -2012,24 +2014,41 @@ struct stripe_dictionaries {
hostdevice_2dvector<gpu::stripe_dictionary> views; // descriptors [string_column][stripe]
std::vector<rmm::device_uvector<uint32_t>> data_owner; // dictionary data owner, per stripe
std::vector<rmm::device_uvector<uint32_t>> index_owner; // dictionary index owner, per stripe
std::vector<rmm::device_uvector<uint32_t>> order_owner; // dictionary order owner, per stripe

// Should be called after encoding is complete to deallocate the dictionary buffers.
void on_encode_complete(rmm::cuda_stream_view stream)
{
data_owner.clear();
index_owner.clear();
order_owner.clear();

for (auto& sd : views.host_view().flat_view()) {
sd.data = {};
sd.index = {};
sd.data = {};
sd.index = {};
sd.data_order = {};
}
views.host_to_device_async(stream);
}
};

/**
* @brief Compares two rows in a strings column
*/
struct string_rows_less {
device_span<orc_column_device_view> cols;
uint32_t col_idx;
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const
{
auto const& col = cols[col_idx];
return col.element<string_view>(lhs_idx) < col.element<string_view>(rhs_idx);
}
};

// Build stripe dictionaries for string columns
stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
file_segmentation const& segmentation,
bool sort_dictionaries,
rmm::cuda_stream_view stream)
{
std::vector<std::vector<rmm::device_uvector<gpu::slot_type>>> hash_maps_storage(
Expand Down Expand Up @@ -2080,6 +2099,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
// Data owners; can be cleared after encode
std::vector<rmm::device_uvector<uint32_t>> dict_data_owner;
std::vector<rmm::device_uvector<uint32_t>> dict_index_owner;
std::vector<rmm::device_uvector<uint32_t>> dict_order_owner;
// Make decision about which stripes to encode with dictionary encoding
for (auto col_idx : orc_table.string_column_indices) {
auto& str_column = orc_table.column(col_idx);
Expand Down Expand Up @@ -2122,15 +2142,61 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
gpu::collect_map_entries(stripe_dicts, stream);
gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream);

// Clear map slots; hash map storage is deallocated at the end of this function
auto device_dicts_flat = stripe_dicts.device_view().flat_view();
thrust::for_each(rmm::exec_policy(stream),
device_dicts_flat.begin(),
device_dicts_flat.end(),
[] __device__(auto& sd) { sd.map_slots = {}; });
stripe_dicts.device_to_host_async(stream);
// deallocate hash map storage, unused after this point
hash_maps_storage.clear();

// Clear map slots and attach order buffers
auto dictionaries_flat = stripe_dicts.host_view().flat_view();
for (auto& sd : dictionaries_flat) {
if (not sd.is_enabled) { continue; }

sd.map_slots = {};
if (sort_dictionaries) {
dict_order_owner.emplace_back(sd.entry_count, stream);
sd.data_order = dict_order_owner.back();
} else {
sd.data_order = {};
}
}
stripe_dicts.host_to_device_async(stream);

// Sort stripe dictionaries alphabetically
if (sort_dictionaries) {
auto streams = cudf::detail::fork_streams(stream, std::min<size_t>(dict_order_owner.size(), 8));
auto stream_idx = 0;
for (auto& sd : dictionaries_flat) {
if (not sd.is_enabled) { continue; }

auto const& current_stream = streams[stream_idx];

// Sort the dictionary data and create a mapping from the sorted order to the original
thrust::sequence(
rmm::exec_policy_nosync(current_stream), sd.data_order.begin(), sd.data_order.end());
thrust::sort_by_key(rmm::exec_policy_nosync(current_stream),
sd.data.begin(),
sd.data.end(),
sd.data_order.begin(),
string_rows_less{orc_table.d_columns, sd.column_idx});

// Create the inverse permutation - i.e. the mapping from the original order to the sorted
auto order_copy = cudf::detail::make_device_uvector_async<uint32_t>(
sd.data_order, current_stream, rmm::mr::get_current_device_resource());
thrust::scatter(rmm::exec_policy_nosync(current_stream),
thrust::counting_iterator<uint32_t>(0),
thrust::counting_iterator<uint32_t>(sd.data_order.size()),
order_copy.begin(),
sd.data_order.begin());

stream_idx = (stream_idx + 1) % streams.size();
}

cudf::detail::join_streams(streams, stream);
}

return {std::move(stripe_dicts), std::move(dict_data_owner), std::move(dict_index_owner)};
return {std::move(stripe_dicts),
std::move(dict_data_owner),
std::move(dict_index_owner),
std::move(dict_order_owner)};
}

/**
Expand All @@ -2142,6 +2208,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
* @param max_stripe_size Maximum size of stripes in the output file
* @param row_index_stride The row index stride
* @param enable_dictionary Whether dictionary is enabled
* @param sort_dictionaries Whether to sort the dictionaries
* @param compression_kind The compression kind
* @param compression_blocksize The block size used for compression
* @param stats_freq Column statistics granularity type for parquet/orc writers
Expand All @@ -2156,6 +2223,7 @@ auto convert_table_to_orc_data(table_view const& input,
stripe_size_limits max_stripe_size,
size_type row_index_stride,
bool enable_dictionary,
bool sort_dictionaries,
CompressionKind compression_kind,
size_t compression_blocksize,
statistics_freq stats_freq,
Expand All @@ -2180,7 +2248,7 @@ auto convert_table_to_orc_data(table_view const& input,
auto segmentation =
calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size);

auto stripe_dicts = build_dictionaries(orc_table, segmentation, stream);
auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream);
auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream);

auto const uncompressed_block_align = uncomp_block_alignment(compression_kind);
Expand Down Expand Up @@ -2314,6 +2382,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
_compression_blocksize(compression_block_size(_compression_kind)),
_compression_statistics(options.get_compression_statistics()),
_stats_freq(options.get_statistics_freq()),
_sort_dictionaries{options.get_enable_dictionary_sort()},
_single_write_mode(mode),
_kv_meta(options.get_key_value_metadata()),
_out_sink(std::move(sink))
Expand All @@ -2335,6 +2404,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
_compression_blocksize(compression_block_size(_compression_kind)),
_compression_statistics(options.get_compression_statistics()),
_stats_freq(options.get_statistics_freq()),
_sort_dictionaries{options.get_enable_dictionary_sort()},
_single_write_mode(mode),
_kv_meta(options.get_key_value_metadata()),
_out_sink(std::move(sink))
Expand Down Expand Up @@ -2382,6 +2452,7 @@ void writer::impl::write(table_view const& input)
_max_stripe_size,
_row_index_stride,
_enable_dictionary,
_sort_dictionaries,
_compression_kind,
_compression_blocksize,
_stats_freq,
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class writer::impl {
size_t const _compression_blocksize;
std::shared_ptr<writer_compression_statistics> _compression_statistics; // Optional output
statistics_freq const _stats_freq;
bool const _sort_dictionaries;
single_write_mode const _single_write_mode; // Special parameter only used by `write()` to
// indicate that we are guaranteeing a single table
// write. This enables some internal optimizations.
Expand Down
Loading

0 comments on commit cb06c20

Please sign in to comment.