-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sort dictionary data alphabetically in the ORC writer #14295
Changes from 11 commits
83ae4d1
3000000
35cea92
dd85893
6fd5901
7bad218
0c50417
2582fb7
aec0821
be9f3d3
4fe94e9
1894a88
8b9abe8
0e21516
4faca02
3e60991
8056e4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
#include <cudf/detail/null_mask.hpp> | ||
#include <cudf/detail/utilities/cuda.cuh> | ||
#include <cudf/detail/utilities/pinned_host_vector.hpp> | ||
#include <cudf/detail/utilities/stream_pool.hpp> | ||
#include <cudf/detail/utilities/vector_factories.hpp> | ||
#include <cudf/strings/strings_column_view.hpp> | ||
#include <cudf/utilities/bit.hpp> | ||
|
@@ -50,6 +51,8 @@ | |
#include <thrust/pair.h> | ||
#include <thrust/reduce.h> | ||
#include <thrust/scan.h> | ||
#include <thrust/sequence.h> | ||
#include <thrust/sort.h> | ||
#include <thrust/tabulate.h> | ||
#include <thrust/transform.h> | ||
|
||
|
@@ -867,16 +870,15 @@ encoded_data encode_columns(orc_table_view const& orc_table, | |
ck.null_mask_num_rows = aligned_rowgroups[rg_idx][column.index()].size(); | ||
ck.encoding_kind = column.orc_encoding(); | ||
ck.type_kind = column.orc_kind(); | ||
if (ck.type_kind == TypeKind::STRING) { | ||
ck.dict_index = (ck.encoding_kind == DICTIONARY_V2) | ||
? column.host_stripe_dict(stripe.id).index.data() | ||
: nullptr; | ||
ck.dtype_len = 1; | ||
} else { | ||
ck.dtype_len = column.type_width(); | ||
} | ||
ck.scale = column.scale(); | ||
if (ck.type_kind == TypeKind::DECIMAL) { ck.decimal_offsets = column.decimal_offsets(); } | ||
auto const is_str_dict = | ||
ck.type_kind == TypeKind::STRING and ck.encoding_kind == DICTIONARY_V2; | ||
ck.dict_index = is_str_dict ? column.host_stripe_dict(stripe.id).index.data() : nullptr; | ||
ck.dict_data_order = | ||
is_str_dict ? column.host_stripe_dict(stripe.id).data_order.data() : nullptr; | ||
ck.dtype_len = (ck.type_kind == TypeKind::STRING) ? 1 : column.type_width(); | ||
ck.scale = column.scale(); | ||
ck.decimal_offsets = | ||
(ck.type_kind == TypeKind::DECIMAL) ? column.decimal_offsets() : nullptr; | ||
} | ||
} | ||
} | ||
|
@@ -2023,21 +2025,37 @@ struct stripe_dictionaries { | |
hostdevice_2dvector<gpu::stripe_dictionary> views; // descriptors [string_column][stripe] | ||
std::vector<rmm::device_uvector<uint32_t>> data_owner; // dictionary data owner, per stripe | ||
std::vector<rmm::device_uvector<uint32_t>> index_owner; // dictionary index owner, per stripe | ||
std::vector<rmm::device_uvector<uint32_t>> order_owner; // dictionary order owner, per stripe | ||
|
||
// Should be called after encoding is complete to deallocate the dictionary buffers. | ||
void on_encode_complete(rmm::cuda_stream_view stream) | ||
{ | ||
data_owner.clear(); | ||
index_owner.clear(); | ||
order_owner.clear(); | ||
|
||
for (auto& sd : views.host_view().flat_view()) { | ||
sd.data = {}; | ||
sd.index = {}; | ||
sd.data = {}; | ||
sd.index = {}; | ||
sd.data_order = {}; | ||
} | ||
views.host_to_device_async(stream); | ||
} | ||
}; | ||
|
||
/** | ||
* @brief Compares two rows in a strings column | ||
*/ | ||
struct string_rows_less { | ||
device_span<orc_column_device_view> cols; | ||
uint32_t col_idx; | ||
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const | ||
{ | ||
auto const& col = cols[col_idx]; | ||
return col.element<string_view>(lhs_idx) < col.element<string_view>(rhs_idx); | ||
} | ||
}; | ||
|
||
// Build stripe dictionaries for string columns | ||
stripe_dictionaries build_dictionaries(orc_table_view& orc_table, | ||
file_segmentation const& segmentation, | ||
|
@@ -2091,6 +2109,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, | |
// Data owners; can be cleared after encode | ||
std::vector<rmm::device_uvector<uint32_t>> dict_data_owner; | ||
std::vector<rmm::device_uvector<uint32_t>> dict_index_owner; | ||
std::vector<rmm::device_uvector<uint32_t>> dict_order_owner; | ||
// Make decision about which stripes to encode with dictionary encoding | ||
for (auto col_idx : orc_table.string_column_indices) { | ||
auto& str_column = orc_table.column(col_idx); | ||
|
@@ -2133,15 +2152,55 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table, | |
gpu::collect_map_entries(stripe_dicts, stream); | ||
gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); | ||
|
||
// Clear map slots; hash map storage is deallocated at the end of this function | ||
auto device_dicts_flat = stripe_dicts.device_view().flat_view(); | ||
thrust::for_each(rmm::exec_policy(stream), | ||
device_dicts_flat.begin(), | ||
device_dicts_flat.end(), | ||
[] __device__(auto& sd) { sd.map_slots = {}; }); | ||
stripe_dicts.device_to_host_async(stream); | ||
// deallocate hash map storage, unused after this point | ||
hash_maps_storage.clear(); | ||
|
||
// Clear map slots and attach order buffers | ||
auto dictionaries_flat = stripe_dicts.host_view().flat_view(); | ||
for (auto& sd : dictionaries_flat) { | ||
if (not sd.is_enabled) { continue; } | ||
|
||
sd.map_slots = {}; | ||
dict_order_owner.emplace_back(sd.entry_count, stream); | ||
sd.data_order = dict_order_owner.back(); | ||
} | ||
stripe_dicts.host_to_device_async(stream); | ||
|
||
// Sort stripe dictionaries alphabetically | ||
auto streams = cudf::detail::fork_streams(stream, std::min<size_t>(dict_order_owner.size(), 8)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is 8 streams an empirical choice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is |
||
auto stream_idx = 0; | ||
for (auto& sd : dictionaries_flat) { | ||
if (not sd.is_enabled) { continue; } | ||
|
||
auto const& current_stream = streams[stream_idx]; | ||
|
||
// Sort the dictionary data and create a mapping from the sorted order to the original | ||
thrust::sequence( | ||
rmm::exec_policy_nosync(current_stream), sd.data_order.begin(), sd.data_order.end()); | ||
thrust::sort_by_key(rmm::exec_policy_nosync(current_stream), | ||
sd.data.begin(), | ||
sd.data.end(), | ||
sd.data_order.begin(), | ||
string_rows_less{orc_table.d_columns, sd.column_idx}); | ||
|
||
// Create the inverse permutation - i.e. the mapping from the original order to the sorted | ||
auto order_copy = cudf::detail::make_device_uvector_async<uint32_t>( | ||
sd.data_order, current_stream, rmm::mr::get_current_device_resource()); | ||
thrust::scatter(rmm::exec_policy_nosync(current_stream), | ||
thrust::counting_iterator<uint32_t>(0), | ||
thrust::counting_iterator<uint32_t>(sd.data_order.size()), | ||
order_copy.begin(), | ||
sd.data_order.begin()); | ||
|
||
stream_idx = (stream_idx + 1) % streams.size(); | ||
} | ||
|
||
cudf::detail::join_streams(streams, stream); | ||
|
||
return {std::move(stripe_dicts), std::move(dict_data_owner), std::move(dict_index_owner)}; | ||
return {std::move(stripe_dicts), | ||
std::move(dict_data_owner), | ||
std::move(dict_index_owner), | ||
std::move(dict_order_owner)}; | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these were left uninitialized when unused, changed to always initialize.