diff --git a/cpp/src/io/json/json_tree.cu b/cpp/src/io/json/json_tree.cu index 1a8ddeefdf5..3aea5455eaf 100644 --- a/cpp/src/io/json/json_tree.cu +++ b/cpp/src/io/json/json_tree.cu @@ -28,15 +28,15 @@ #include #include -#include - -#include - #include #include #include #include +#include + +#include + #include #include #include diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 9b0541259e1..c57962700f6 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -16,471 +16,260 @@ #include "orc_gpu.hpp" +#include #include -#include -#include +#include #include -#include - -#include -#include -#include - -namespace cudf { -namespace io { -namespace orc { -namespace gpu { -constexpr int init_hash_bits = 12; - -struct dictinit_state_s { - uint32_t nnz; - uint32_t total_dupes; - DictionaryChunk chunk; - volatile uint32_t scratch_red[32]; - uint32_t* dict; - union { - uint16_t u16[1 << (init_hash_bits)]; - uint32_t u32[1 << (init_hash_bits - 1)]; - } map; -}; -/** - * @brief Return a 12-bit hash from a string - */ -static inline __device__ uint32_t hash_string(string_view const val) -{ - if (val.empty()) { - return 0; - } else { - char const* ptr = val.data(); - uint32_t len = val.size_bytes(); - return (ptr[0] + (ptr[len - 1] << 5) + (len << 10)) & ((1 << init_hash_bits) - 1); - } -} +namespace cudf::io::orc::gpu { /** - * @brief Fill dictionary with the indices of non-null rows - * - * @param[in,out] s dictionary builder state - * @param[in] t thread id - * @param[in] temp_storage shared memory storage to scan non-null positions + * @brief Counts the number of characters in each rowgroup of each string column. */ -template -static __device__ void LoadNonNullIndices(volatile dictinit_state_s* s, - int t, - Storage& temp_storage) +__global__ void rowgroup_char_counts_kernel(device_2dspan char_counts, + device_span orc_columns, + device_2dspan rowgroup_bounds, + device_span str_col_indexes) { - if (t == 0) { s->nnz = 0; } - if (s->chunk.num_rows <= 0) { - // A sync is needed for s->nnz if there are no times through the loop - __syncthreads(); - } - for (uint32_t i = 0; i < s->chunk.num_rows; i += block_size) { - uint32_t const* valid_map = s->chunk.leaf_column->null_mask(); - auto column_offset = s->chunk.leaf_column->offset(); - uint32_t is_valid, nz_pos; - if (t < block_size / 32) { - if (!valid_map) { - s->scratch_red[t] = 0xffff'ffffu; - } else { - uint32_t const row = s->chunk.start_row + i + t * 32; - auto const chunk_end = s->chunk.start_row + s->chunk.num_rows; - - auto const valid_map_idx = (row + column_offset) / 32; - uint32_t valid = (row < chunk_end) ? valid_map[valid_map_idx] : 0; - - auto const rows_in_next_word = (row + column_offset) & 0x1f; - if (rows_in_next_word != 0) { - auto const rows_in_current_word = 32 - rows_in_next_word; - // Read next word if any rows are within the chunk - uint32_t const valid_next = - (row + rows_in_current_word < chunk_end) ? valid_map[valid_map_idx + 1] : 0; - valid = __funnelshift_r(valid, valid_next, rows_in_next_word); - } - s->scratch_red[t] = valid; - } - } - __syncthreads(); - is_valid = (i + t < s->chunk.num_rows) ? (s->scratch_red[t >> 5] >> (t & 0x1f)) & 1 : 0; - uint32_t tmp_nnz; - cub::BlockScan(temp_storage) - .ExclusiveSum(is_valid, nz_pos, tmp_nnz); - nz_pos += s->nnz; - __syncthreads(); - if (!t) { s->nnz += tmp_nnz; } - if (is_valid) { s->dict[nz_pos] = i + t; } - __syncthreads(); - } + // Index of the column in the `str_col_indexes` array + auto const str_col_idx = blockIdx.y; + // Index of the column in the `orc_columns` array + auto const col_idx = str_col_indexes[str_col_idx]; + auto const row_group_idx = blockIdx.x * blockDim.x + threadIdx.x; + if (row_group_idx >= rowgroup_bounds.size().first) { return; } + + auto const start_row = rowgroup_bounds[row_group_idx][col_idx].begin; + auto const num_rows = rowgroup_bounds[row_group_idx][col_idx].size(); + + auto const& offsets = orc_columns[col_idx].child(strings_column_view::offsets_column_index); + char_counts[str_col_idx][row_group_idx] = + offsets.element(start_row + num_rows) - offsets.element(start_row); } -/** - * @brief Gather all non-NULL string rows and compute total character data size - */ -// blockDim {block_size,1,1} -template -__global__ void __launch_bounds__(block_size, 2) - gpuInitDictionaryIndices(device_2dspan chunks, - device_span orc_columns, - device_span> dict_data, - device_span> dict_index, - device_span> tmp_indices, - device_2dspan rowgroup_bounds, - device_span str_col_indexes) +void rowgroup_char_counts(device_2dspan counts, + device_span orc_columns, + device_2dspan rowgroup_bounds, + device_span str_col_indexes, + rmm::cuda_stream_view stream) { - __shared__ __align__(16) dictinit_state_s state_g; + if (rowgroup_bounds.count() == 0) { return; } - using block_reduce = cub::BlockReduce; - using block_scan = cub::BlockScan; + auto const num_rowgroups = rowgroup_bounds.size().first; + auto const num_str_cols = str_col_indexes.size(); - __shared__ union { - typename block_reduce::TempStorage reduce_storage; - typename block_scan::TempStorage scan_storage; - } temp_storage; + int block_size = 0; // suggested thread count to use + int min_grid_size = 0; // minimum block count required + CUDF_CUDA_TRY( + cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, rowgroup_char_counts_kernel)); + auto const grid_size = + dim3(cudf::util::div_rounding_up_unsafe(num_rowgroups, block_size), + static_cast(num_str_cols)); - dictinit_state_s* const s = &state_g; - // Index of the column in the `str_col_indexes` array - uint32_t const str_col_idx = blockIdx.x; - // Index of the column in the `orc_columns` array - auto const col_idx = str_col_indexes[str_col_idx]; - uint32_t group_id = blockIdx.y; - auto const num_str_cols = str_col_indexes.size(); - uint32_t nnz, start_row, dict_char_count; - int t = threadIdx.x; + rowgroup_char_counts_kernel<<>>( + counts, orc_columns, rowgroup_bounds, str_col_indexes); +} - if (t == 0) { - s->chunk = chunks[group_id][str_col_idx]; - s->chunk.leaf_column = &orc_columns[col_idx]; - s->chunk.dict_data = dict_data[str_col_idx].data() + rowgroup_bounds[group_id][col_idx].begin; - s->chunk.dict_index = dict_index[str_col_idx].data(); - s->chunk.start_row = rowgroup_bounds[group_id][col_idx].begin; - s->chunk.num_rows = rowgroup_bounds[group_id][col_idx].size(); - s->dict = tmp_indices[str_col_idx].data() + s->chunk.start_row; - } - for (uint32_t i = 0; i < sizeof(s->map) / sizeof(uint32_t); i += block_size) { - if (i + t < sizeof(s->map) / sizeof(uint32_t)) s->map.u32[i + t] = 0; - } - __syncthreads(); - // First, take care of NULLs, and count how many strings we have (TODO: bypass this step when - // there are no nulls) - LoadNonNullIndices(s, t, temp_storage.scan_storage); - // Sum the lengths of all the strings - if (t == 0) { - s->chunk.string_char_count = 0; - s->total_dupes = 0; - } - nnz = s->nnz; - auto t_dict_data = s->chunk.dict_data; - start_row = s->chunk.start_row; - for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0; - uint32_t hash = 0; - uint32_t len = 0; - if (i + t < nnz) { - ck_row = s->dict[i + t]; - string_view string_val = s->chunk.leaf_column->element(ck_row + start_row); - len = static_cast(string_val.size_bytes()); - hash = hash_string(string_val); - } - len = block_reduce(temp_storage.reduce_storage).Sum(len); - if (t == 0) s->chunk.string_char_count += len; - if (i + t < nnz) { - atomicAdd(&s->map.u32[hash >> 1], 1 << ((hash & 1) ? 16 : 0)); - t_dict_data[i + t] = start_row + ck_row; +template +__global__ void __launch_bounds__(block_size) + initialize_dictionary_hash_maps_kernel(device_span dictionaries) +{ + auto const dict_map = dictionaries[blockIdx.x].map_slots; + auto const t = threadIdx.x; + for (size_type i = 0; i < dict_map.size(); i += block_size) { + if (t + i < dict_map.size()) { + new (&dict_map[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; + new (&dict_map[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; } - __syncthreads(); } - // Reorder the 16-bit local indices according to the hash value of the strings - static_assert((init_hash_bits == 12), "Hardcoded for init_hash_bits=12"); +} + +struct equality_functor { + column_device_view const& col; + __device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const { - // Cumulative sum of hash map counts - uint32_t count01 = s->map.u32[t * 4 + 0]; - uint32_t count23 = s->map.u32[t * 4 + 1]; - uint32_t count45 = s->map.u32[t * 4 + 2]; - uint32_t count67 = s->map.u32[t * 4 + 3]; - uint32_t sum01 = count01 + (count01 << 16); - uint32_t sum23 = count23 + (count23 << 16); - uint32_t sum45 = count45 + (count45 << 16); - uint32_t sum67 = count67 + (count67 << 16); - sum23 += (sum01 >> 16) * 0x1'0001; - sum45 += (sum23 >> 16) * 0x1'0001; - sum67 += (sum45 >> 16) * 0x1'0001; - uint32_t sum_w = sum67 >> 16; - block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w); - __syncthreads(); - sum_w = (sum_w - (sum67 >> 16)) * 0x1'0001; - s->map.u32[t * 4 + 0] = sum_w + sum01 - count01; - s->map.u32[t * 4 + 1] = sum_w + sum23 - count23; - s->map.u32[t * 4 + 2] = sum_w + sum45 - count45; - s->map.u32[t * 4 + 3] = sum_w + sum67 - count67; - __syncthreads(); + // We don't call this for nulls so this is fine + auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{}; + return equal(col.element(lhs_idx), col.element(rhs_idx)); } - // Put the indices back in hash order - for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0; - uint32_t hash = 0; - uint32_t pos = 0; - uint32_t pos_old = 0; - uint32_t sh = 0; - if (i + t < nnz) { - ck_row = t_dict_data[i + t] - start_row; - string_view string_val = s->chunk.leaf_column->element(ck_row + start_row); - hash = hash_string(string_val); - sh = (hash & 1) ? 16 : 0; - pos_old = s->map.u16[hash]; - } - // The isolation of the atomicAdd, along with pos_old/pos_new is to guarantee deterministic - // behavior for the first row in the hash map that will be used for early duplicate detection - __syncthreads(); - if (i + t < nnz) { - pos = (atomicAdd(&s->map.u32[hash >> 1], 1 << sh) >> sh) & 0xffff; - s->dict[pos] = ck_row; - } - __syncthreads(); - bool collision = false; - uint32_t colliding_row = 0; - uint32_t pos_new = 0; - if (i + t < nnz) { - pos_new = s->map.u16[hash]; - collision = (pos != pos_old && pos_new > pos_old + 1); - if (collision) { colliding_row = s->dict[pos_old]; } - } - __syncthreads(); - if (collision) { atomicMin(s->dict + pos_old, ck_row); } +}; - __syncthreads(); - // Resolve collision - if (collision && ck_row == s->dict[pos_old]) { s->dict[pos] = colliding_row; } +struct hash_functor { + column_device_view const& col; + __device__ auto operator()(size_type idx) const + { + return cudf::detail::MurmurHash3_32{}(col.element(idx)); } - __syncthreads(); - // Now that the strings are ordered by hash, compare every string with the first entry in the hash - // map, the position of the first string can be inferred from the hash map counts - dict_char_count = 0; - for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0; - if (i + t < nnz) { - ck_row = s->dict[i + t]; - string_view string_value = s->chunk.leaf_column->element(ck_row + start_row); - auto const string_length = static_cast(string_value.size_bytes()); - auto const hash = hash_string(string_value); - ck_row_ref = s->dict[(hash > 0) ? s->map.u16[hash - 1] : 0]; - if (ck_row_ref != ck_row) { - string_view reference_string = - s->chunk.leaf_column->element(ck_row_ref + start_row); - is_dupe = (string_value == reference_string); - dict_char_count += (is_dupe) ? 0 : string_length; +}; + +template +__global__ void __launch_bounds__(block_size) + populate_dictionary_hash_maps_kernel(device_2dspan dictionaries, + device_span columns) +{ + auto const col_idx = blockIdx.x; + auto const stripe_idx = blockIdx.y; + auto const t = threadIdx.x; + auto& dict = dictionaries[col_idx][stripe_idx]; + auto const& col = columns[dict.column_idx]; + + // Make a view of the hash map + auto hash_map_mutable = map_type::device_mutable_view(dict.map_slots.data(), + dict.map_slots.size(), + cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}); + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; + + auto const start_row = dict.start_row; + auto const end_row = dict.start_row + dict.num_rows; + + size_type entry_count{0}; + size_type char_count{0}; + // all threads should loop the same number of times + for (auto cur_row = start_row + t; cur_row - t < end_row; cur_row += block_size) { + auto const is_valid = cur_row < end_row and col.is_valid(cur_row); + + if (is_valid) { + // insert element at cur_row to hash map and count successful insertions + auto const is_unique = + hash_map_mutable.insert(std::pair(cur_row, cur_row), hash_fn, equality_fn); + + if (is_unique) { + ++entry_count; + char_count += col.element(cur_row).size_bytes(); } } - uint32_t dupes_in_block; - uint32_t dupes_before; - block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); - dupes_before += s->total_dupes; + // ensure that threads access adjacent rows in each iteration __syncthreads(); - if (!t) { s->total_dupes += dupes_in_block; } - if (i + t < nnz) { - if (!is_dupe) { - t_dict_data[i + t - dupes_before] = ck_row + start_row; - } else { - s->chunk.dict_index[ck_row + start_row] = (ck_row_ref + start_row) | (1u << 31); - } - } } - // temp_storage is being used twice, so make sure there is `__syncthreads()` between them - // while making any future changes. - dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count); - if (!t) { - chunks[group_id][str_col_idx].num_strings = nnz; - chunks[group_id][str_col_idx].string_char_count = s->chunk.string_char_count; - chunks[group_id][str_col_idx].num_dict_strings = nnz - s->total_dupes; - chunks[group_id][str_col_idx].dict_char_count = dict_char_count; - chunks[group_id][str_col_idx].leaf_column = s->chunk.leaf_column; - - chunks[group_id][str_col_idx].dict_data = s->chunk.dict_data; - chunks[group_id][str_col_idx].dict_index = s->chunk.dict_index; - chunks[group_id][str_col_idx].start_row = s->chunk.start_row; - chunks[group_id][str_col_idx].num_rows = s->chunk.num_rows; + + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage reduce_storage; + + auto const block_entry_count = block_reduce(reduce_storage).Sum(entry_count); + __syncthreads(); + auto const block_char_count = block_reduce(reduce_storage).Sum(char_count); + + if (t == 0) { + dict.entry_count = block_entry_count; + dict.char_count = block_char_count; } } -/** - * @brief In-place concatenate dictionary data for all chunks in each stripe - * - * @param[in] stripes StripeDictionary device array [stripe][column] - * @param[in] chunks DictionaryChunk device array [rowgroup][column] - * @param[in] num_columns Number of columns - */ -// blockDim {1024,1,1} -__global__ void __launch_bounds__(1024) - gpuCompactChunkDictionaries(device_2dspan stripes, - device_2dspan chunks) +template +__global__ void __launch_bounds__(block_size) + collect_map_entries_kernel(device_2dspan dictionaries) { - __shared__ __align__(16) StripeDictionary stripe_g; - __shared__ __align__(16) DictionaryChunk chunk_g; - __shared__ uint32_t const* volatile ck_curptr_g; - __shared__ uint32_t volatile ck_curlen_g; - - uint32_t col_id = blockIdx.x; - uint32_t stripe_id = blockIdx.y; - uint32_t chunk_len; - int t = threadIdx.x; - uint32_t const* src; - uint32_t* dst; - - if (t == 0) stripe_g = stripes[stripe_id][col_id]; - __syncthreads(); - if (!stripe_g.dict_data) { return; } - if (t == 0) chunk_g = chunks[stripe_g.start_chunk][col_id]; + auto const col_idx = blockIdx.x; + auto const stripe_idx = blockIdx.y; + auto const& dict = dictionaries[col_idx][stripe_idx]; + + if (not dict.is_enabled) { return; } + + auto const t = threadIdx.x; + auto map = map_type::device_view(dict.map_slots.data(), + dict.map_slots.size(), + cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}); + + __shared__ cuda::atomic counter; + + using cuda::std::memory_order_relaxed; + if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - dst = stripe_g.dict_data + chunk_g.num_dict_strings; - for (uint32_t g = 1; g < stripe_g.num_chunks; g++) { - if (!t) { - src = chunks[stripe_g.start_chunk + g][col_id].dict_data; - chunk_len = chunks[stripe_g.start_chunk + g][col_id].num_dict_strings; - ck_curptr_g = src; - ck_curlen_g = chunk_len; - } - __syncthreads(); - src = ck_curptr_g; - chunk_len = ck_curlen_g; - if (src != dst) { - for (uint32_t i = 0; i < chunk_len; i += 1024) { - uint32_t idx = (i + t < chunk_len) ? src[i + t] : 0; - __syncthreads(); - if (i + t < chunk_len) dst[i + t] = idx; + for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { + if (t + i < dict.map_slots.size()) { + auto* slot = reinterpret_cast(map.begin_slot() + t + i); + auto key = slot->first; + if (key != KEY_SENTINEL) { + auto loc = counter.fetch_add(1, memory_order_relaxed); + dict.data[loc] = key; + slot->second = loc; } } - dst += chunk_len; - __syncthreads(); } } -struct build_state_s { - uint32_t total_dupes; - StripeDictionary stripe; - volatile uint32_t scratch_red[32]; -}; - -/** - * @brief Eliminate duplicates in-place and generate column dictionary index - * - * @param[in] stripes StripeDictionary device array [stripe][column] - * @param[in] num_columns Number of string columns - */ -// NOTE: Prone to poor utilization on small datasets due to 1 block per dictionary -// blockDim {1024,1,1} template __global__ void __launch_bounds__(block_size) - gpuBuildStripeDictionaries(device_2dspan stripes) + get_dictionary_indices_kernel(device_2dspan dictionaries, + device_span columns) { - __shared__ __align__(16) build_state_s state_g; - using block_reduce = cub::BlockReduce; - using block_scan = cub::BlockScan; - __shared__ union { - typename block_reduce::TempStorage reduce_storage; - typename block_scan::TempStorage scan_storage; - } temp_storage; - - build_state_s* const s = &state_g; - uint32_t col_id = blockIdx.x; - uint32_t stripe_id = blockIdx.y; - uint32_t num_strings; - uint32_t *dict_data, *dict_index; - uint32_t dict_char_count; - int t = threadIdx.x; - - if (t == 0) s->stripe = stripes[stripe_id][col_id]; - if (t == 31 * 32) { s->total_dupes = 0; } - __syncthreads(); - num_strings = s->stripe.num_strings; - dict_data = s->stripe.dict_data; - if (!dict_data) return; - dict_index = s->stripe.dict_index; - string_view current_string = string_view::min(); - dict_char_count = 0; - for (uint32_t i = 0; i < num_strings; i += block_size) { - uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0; - uint32_t cur_len = 0; - bool is_dupe = false; - if (i + t < num_strings) { - current_string = s->stripe.leaf_column->element(cur); - cur_len = current_string.size_bytes(); - } - if (i + t != 0 && i + t < num_strings) { - uint32_t prev = dict_data[i + t - 1]; - is_dupe = (current_string == (s->stripe.leaf_column->element(prev))); - } - dict_char_count += (is_dupe) ? 0 : cur_len; - uint32_t dupes_in_block; - uint32_t dupes_before; - block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); - dupes_before += s->total_dupes; - __syncthreads(); - if (!t) { s->total_dupes += dupes_in_block; } - if (i + t < num_strings) { - dict_index[cur] = i + t - dupes_before; - if (!is_dupe && dupes_before != 0) { dict_data[i + t - dupes_before] = cur; } + auto const col_idx = blockIdx.x; + auto const stripe_idx = blockIdx.y; + auto const& dict = dictionaries[col_idx][stripe_idx]; + auto const& col = columns[dict.column_idx]; + + if (not dict.is_enabled) { return; } + + auto const t = threadIdx.x; + auto const start_row = dict.start_row; + auto const end_row = dict.start_row + dict.num_rows; + + auto const map = map_type::device_view(dict.map_slots.data(), + dict.map_slots.size(), + cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}); + + auto cur_row = start_row + t; + while (cur_row < end_row) { + auto const is_valid = cur_row < col.size() and col.is_valid(cur_row); + + if (is_valid) { + auto const hash_fn = hash_functor{col}; + auto const equality_fn = equality_functor{col}; + auto const found_slot = map.find(cur_row, hash_fn, equality_fn); + cudf_assert(found_slot != map.end() && + "Unable to find value in map in dictionary index construction"); + if (found_slot != map.end()) { + // No need for atomic as this is not going to be modified by any other thread + auto const val_ptr = reinterpret_cast(&found_slot->second); + dict.index[cur_row] = *val_ptr; + } } - __syncthreads(); - } - dict_char_count = block_reduce(temp_storage.reduce_storage).Sum(dict_char_count); - if (t == 0) { - stripes[stripe_id][col_id].num_strings = num_strings - s->total_dupes; - stripes[stripe_id][col_id].dict_char_count = dict_char_count; + cur_row += block_size; } } -void InitDictionaryIndices(device_span orc_columns, - device_2dspan chunks, - device_span> dict_data, - device_span> dict_index, - device_span> tmp_indices, - device_2dspan rowgroup_bounds, - device_span str_col_indexes, - rmm::cuda_stream_view stream) +void initialize_dictionary_hash_maps(device_2dspan dictionaries, + rmm::cuda_stream_view stream) { - static constexpr int block_size = 512; - dim3 dim_block(block_size, 1); - dim3 dim_grid(str_col_indexes.size(), rowgroup_bounds.size().first); - gpuInitDictionaryIndices<<>>( - chunks, orc_columns, dict_data, dict_index, tmp_indices, rowgroup_bounds, str_col_indexes); + if (dictionaries.count() == 0) { return; } + constexpr int block_size = 1024; + initialize_dictionary_hash_maps_kernel + <<>>(dictionaries.flat_view()); } -/** - * @copydoc cudf::io::orc::gpu::BuildStripeDictionaries - */ -void BuildStripeDictionaries(device_2dspan d_stripes_dicts, - host_2dspan h_stripe_dicts, - device_2dspan chunks, - rmm::cuda_stream_view stream) +void populate_dictionary_hash_maps(device_2dspan dictionaries, + device_span columns, + rmm::cuda_stream_view stream) { - auto const num_stripes = h_stripe_dicts.size().first; - auto const num_columns = h_stripe_dicts.size().second; - - dim3 dim_block(1024, 1); // 1024 threads per chunk - dim3 dim_grid_build(num_columns, num_stripes); - gpuCompactChunkDictionaries<<>>(d_stripes_dicts, - chunks); - for (uint32_t stripe_idx = 0; stripe_idx < num_stripes; ++stripe_idx) { - for (auto const& stripe_dict : h_stripe_dicts[stripe_idx]) { - if (stripe_dict.dict_data != nullptr) { - auto const dict_data_ptr = thrust::device_pointer_cast(stripe_dict.dict_data); - auto const string_column = stripe_dict.leaf_column; - // NOTE: Requires the --expt-extended-lambda nvcc flag - thrust::sort(rmm::exec_policy(stream), - dict_data_ptr, - dict_data_ptr + stripe_dict.num_strings, - [string_column] __device__(uint32_t const& lhs, uint32_t const& rhs) { - return string_column->element(lhs) < - string_column->element(rhs); - }); - } - } - } - gpuBuildStripeDictionaries<1024> - <<>>(d_stripes_dicts); + if (dictionaries.count() == 0) { return; } + constexpr int block_size = 256; + dim3 const dim_grid(dictionaries.size().first, dictionaries.size().second); + populate_dictionary_hash_maps_kernel + <<>>(dictionaries, columns); +} + +void collect_map_entries(device_2dspan dictionaries, + rmm::cuda_stream_view stream) +{ + if (dictionaries.count() == 0) { return; } + constexpr int block_size = 1024; + dim3 const dim_grid(dictionaries.size().first, dictionaries.size().second); + collect_map_entries_kernel<<>>(dictionaries); +} + +void get_dictionary_indices(device_2dspan dictionaries, + device_span columns, + rmm::cuda_stream_view stream) +{ + if (dictionaries.count() == 0) { return; } + constexpr int block_size = 1024; + dim3 const dim_grid(dictionaries.size().first, dictionaries.size().second); + get_dictionary_indices_kernel + <<>>(dictionaries, columns); } -} // namespace gpu -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc::gpu diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 72919b47da6..fc50b7118be 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -24,9 +24,19 @@ #include -namespace cudf { -namespace io { -namespace orc { +namespace cudf::io::orc { + +namespace { +[[nodiscard]] constexpr uint32_t varint_size(uint64_t val) +{ + auto len = 1u; + while (val > 0x7f) { + val >>= 7; + ++len; + } + return len; +} +} // namespace uint32_t ProtobufReader::read_field_size(uint8_t const* end) { @@ -515,6 +525,4 @@ void metadata::init_parent_descriptors() } } -} // namespace orc -} // namespace io -} // namespace cudf +} // namespace cudf::io::orc diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index b2763a8a4ce..6f65e384d2d 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -465,16 +465,6 @@ class ProtobufWriter { return l; } - uint32_t varint_size(uint64_t val) - { - auto len = 1u; - while (val > 0x7f) { - val >>= 7; - ++len; - } - return len; - } - uint32_t put_int(int64_t v) { int64_t s = (v < 0); diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 664d4ea05de..681cc0fb9d2 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -31,6 +31,8 @@ #include +#include + namespace cudf { namespace io { namespace orc { @@ -39,6 +41,19 @@ namespace gpu { using cudf::detail::device_2dspan; using cudf::detail::host_2dspan; +auto constexpr KEY_SENTINEL = size_type{-1}; +auto constexpr VALUE_SENTINEL = size_type{-1}; + +using map_type = cuco::static_map; + +/** + * @brief The alias of `map_type::pair_atomic_type` class. + * + * Declare this struct by trivial subclassing instead of type aliasing so we can have forward + * declaration of this struct somewhere else. + */ +struct slot_type : public map_type::slot_type {}; + struct CompressedStreamInfo { CompressedStreamInfo() = default; explicit constexpr CompressedStreamInfo(uint8_t const* compressed_data_, size_t compressed_size_) @@ -172,36 +187,63 @@ struct StripeStream { }; /** - * @brief Struct to describe a dictionary chunk + * @brief Struct to describe a stripe dictionary */ -struct DictionaryChunk { - uint32_t* dict_data; // dictionary data (index of non-null rows) - uint32_t* dict_index; // row indices of corresponding string (row from dictionary index) - uint32_t start_row; // start row of this chunk - uint32_t num_rows; // num rows in this chunk - uint32_t num_strings; // number of strings in this chunk - uint32_t - string_char_count; // total size of string data (NOTE: assumes less than 4G bytes per chunk) - uint32_t num_dict_strings; // number of strings in dictionary - uint32_t dict_char_count; // size of dictionary string data for this chunk - - orc_column_device_view const* leaf_column; //!< Pointer to string column +struct stripe_dictionary { + // input + device_span map_slots; // hash map storage + uint32_t column_idx = 0; // column index + size_type start_row = 0; // first row in the stripe + size_type start_rowgroup = 0; // first rowgroup in the stripe + size_type num_rows = 0; // number of rows in the stripe + + // output + device_span data; // index of elements in the column to include in the dictionary + device_span index; // index into the dictionary for each row in the column + size_type entry_count = 0; // number of entries in the dictionary + size_type char_count = 0; // number of characters in the dictionary + bool is_enabled = false; // true if dictionary encoding is enabled for this stripe }; /** - * @brief Struct to describe a dictionary + * @brief Initializes the hash maps storage for dictionary encoding to sentinel values. + * + * @param dictionaries Dictionary descriptors + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void initialize_dictionary_hash_maps(device_2dspan dictionaries, + rmm::cuda_stream_view stream); + +/** + * @brief Populates the hash maps with unique values from the stripe. + * + * @param dictionaries Dictionary descriptors + * @param columns Pre-order flattened device array of ORC column views + * @param stream CUDA stream used for device memory operations and kernel launches */ -struct StripeDictionary { - uint32_t* dict_data; // row indices of corresponding string (row from dictionary index) - uint32_t* dict_index; // dictionary index from row index - uint32_t column_id; // real column id - uint32_t start_chunk; // first chunk in stripe - uint32_t num_chunks; // number of chunks in the stripe - uint32_t num_strings; // number of unique strings in the dictionary - uint32_t dict_char_count; // total size of dictionary string data - - orc_column_device_view const* leaf_column; //!< Pointer to string column -}; +void populate_dictionary_hash_maps(device_2dspan dictionaries, + device_span columns, + rmm::cuda_stream_view stream); + +/** + * @brief Stores the indices of the hash map entries in the dictionary data buffer. + * + * @param dictionaries Dictionary descriptors + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void collect_map_entries(device_2dspan dictionaries, + rmm::cuda_stream_view stream); + +/** + * @brief Stores the corresponding dictionary indices for each row in the column. + * + * @param dictionaries Dictionary descriptors + * @param columns Pre-order flattened device array of ORC column views + * @param stream CUDA stream used for device memory operations and kernel launches + */ +void get_dictionary_indices(device_2dspan dictionaries, + device_span columns, + rmm::cuda_stream_view stream); constexpr uint32_t encode_block_size = 512; @@ -317,14 +359,16 @@ void EncodeOrcColumnData(device_2dspan chunks, /** * @brief Launches kernel for encoding column dictionaries * - * @param[in] stripes Stripe dictionaries device array [stripe][string_column] + * @param[in] stripes Stripe dictionaries device array + * @param[in] columns Pre-order flattened device array of ORC column views * @param[in] chunks encoder chunk device array [column][rowgroup] * @param[in] num_string_columns Number of string columns * @param[in] num_stripes Number of stripes * @param[in,out] enc_streams chunk streams device array [column][rowgroup] * @param[in] stream CUDA stream used for device memory operations and kernel launches */ -void EncodeStripeDictionaries(StripeDictionary const* stripes, +void EncodeStripeDictionaries(stripe_dictionary const* stripes, + device_span columns, device_2dspan chunks, uint32_t num_string_columns, uint32_t num_stripes, @@ -373,38 +417,19 @@ std::optional CompressOrcDataStreams( rmm::cuda_stream_view stream); /** - * @brief Launches kernel for initializing dictionary chunks - * - * @param[in] orc_columns Pre-order flattened device array of ORC column views - * @param[in,out] chunks DictionaryChunk device array [rowgroup][column] - * @param[in] dict_data dictionary data (index of non-null rows) - * @param[in] dict_index row indices of corresponding string (row from dictionary index) - * @param[in] tmp_indices Temporary buffer for dictionary indices - * @param[in] rowgroup_bounds Ranges of rows in each rowgroup [rowgroup][column] - * @param[in] str_col_indexes List of columns that are strings type - * @param[in] stream CUDA stream used for device memory operations and kernel launches - */ -void InitDictionaryIndices(device_span orc_columns, - device_2dspan chunks, - device_span> dict_data, - device_span> dict_index, - device_span> tmp_indices, - device_2dspan rowgroup_bounds, - device_span str_col_indexes, - rmm::cuda_stream_view stream); - -/** - * @brief Launches kernel for building stripe dictionaries + * @brief Counts the number of characters in each rowgroup of each string column. * - * @param[in] d_stripes StripeDictionary device 2D array [stripe][column] - * @param[in] h_stripes StripeDictionary host 2D array [stripe][column] - * @param[in] chunks DictionaryChunk device array [rowgroup][column] - * @param[in] stream CUDA stream used for device memory operations and kernel launches + * @param counts Output array of character counts [column][rowgroup] + * @param orc_columns Pre-order flattened device array of ORC column views + * @param rowgroup_bounds Ranges of rows in each rowgroup [rowgroup][column] + * @param str_col_indexes Indexes of string columns in orc_columns + * @param stream CUDA stream used for device memory operations and kernel launches */ -void BuildStripeDictionaries(device_2dspan d_stripes, - host_2dspan h_stripes, - device_2dspan chunks, - rmm::cuda_stream_view stream); +void rowgroup_char_counts(device_2dspan counts, + device_span orc_columns, + device_2dspan rowgroup_bounds, + device_span str_col_indexes, + rmm::cuda_stream_view stream); /** * @brief Launches kernels to initialize statistics collection diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index d917319c120..73c41e2bbcd 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -88,7 +88,7 @@ struct orcenc_state_s { byterle_enc_state_s byterle; intrle_enc_state_s intrle; strdata_enc_state_s strenc; - StripeDictionary dict_stripe; + stripe_dictionary const* dict_stripe; } u; union { uint8_t u8[scratch_buffer_size]; // gblock_vminscratch buffer @@ -996,14 +996,16 @@ __global__ void __launch_bounds__(block_size) /** * @brief Encode column dictionaries * - * @param[in] stripes Stripe dictionaries device array [stripe][string_column] + * @param[in] stripes Stripe dictionaries device array + * @param[in] columns Pre-order flattened device array of ORC column views * @param[in] chunks EncChunk device array [rowgroup][column] * @param[in] num_columns Number of columns */ // blockDim {512,1,1} template __global__ void __launch_bounds__(block_size) - gpuEncodeStringDictionaries(StripeDictionary const* stripes, + gpuEncodeStringDictionaries(stripe_dictionary const* stripes, + device_span columns, device_2dspan chunks, device_2dspan streams) { @@ -1015,20 +1017,20 @@ __global__ void __launch_bounds__(block_size) uint32_t cid = (blockIdx.y) ? CI_DICTIONARY : CI_DATA2; int t = threadIdx.x; - if (t == 0) s->u.dict_stripe = stripes[stripe_id]; + if (t == 0) s->u.dict_stripe = &stripes[stripe_id]; __syncthreads(); - auto const strm_ptr = &streams[s->u.dict_stripe.column_id][s->u.dict_stripe.start_chunk]; + auto const strm_ptr = &streams[s->u.dict_stripe->column_idx][s->u.dict_stripe->start_rowgroup]; if (t == 0) { - s->chunk = chunks[s->u.dict_stripe.column_id][s->u.dict_stripe.start_chunk]; + s->chunk = chunks[s->u.dict_stripe->column_idx][s->u.dict_stripe->start_rowgroup]; s->stream = *strm_ptr; s->strm_pos[cid] = 0; s->numlengths = 0; - s->nrows = s->u.dict_stripe.num_strings; + s->nrows = s->u.dict_stripe->entry_count; s->cur_row = 0; } - auto const string_column = s->u.dict_stripe.leaf_column; - auto const dict_data = s->u.dict_stripe.dict_data; + auto const string_column = columns[s->u.dict_stripe->column_idx]; + auto const dict_data = s->u.dict_stripe->data; __syncthreads(); if (s->chunk.encoding_kind != DICTIONARY_V2) { return; // This column isn't using dictionary encoding -> bail out @@ -1042,7 +1044,7 @@ __global__ void __launch_bounds__(block_size) char const* ptr = nullptr; uint32_t count = 0; if (t < numvals) { - auto string_val = string_column->element(string_idx); + auto string_val = string_column.element(string_idx); ptr = string_val.data(); count = string_val.size_bytes(); } @@ -1056,7 +1058,7 @@ __global__ void __launch_bounds__(block_size) // Encoding string lengths uint32_t count = (t < numvals) - ? static_cast(string_column->element(string_idx).size_bytes()) + ? static_cast(string_column.element(string_idx).size_bytes()) : 0; uint32_t nz_idx = (s->cur_row + t) & 0x3ff; if (t < numvals) s->lengths.u32[nz_idx] = count; @@ -1268,7 +1270,8 @@ void EncodeOrcColumnData(device_2dspan chunks, <<>>(chunks, streams); } -void EncodeStripeDictionaries(StripeDictionary const* stripes, +void EncodeStripeDictionaries(stripe_dictionary const* stripes, + device_span columns, device_2dspan chunks, uint32_t num_string_columns, uint32_t num_stripes, @@ -1278,7 +1281,7 @@ void EncodeStripeDictionaries(StripeDictionary const* stripes, dim3 dim_block(512, 1); // 512 threads per dictionary dim3 dim_grid(num_string_columns * num_stripes, 2); gpuEncodeStringDictionaries<512> - <<>>(stripes, chunks, enc_streams); + <<>>(stripes, columns, chunks, enc_streams); } void CompactOrcDataStreams(device_2dspan strm_desc, diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 48bb1004ac7..881fc3b5caf 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -72,6 +72,17 @@ namespace orc { using namespace cudf::io::orc; using namespace cudf::io; +template +[[nodiscard]] constexpr int varint_size(T val) +{ + auto len = 1u; + while (val > 0x7f) { + val >>= 7; + ++len; + } + return len; +} + struct row_group_index_info { int32_t pos = -1; // Position int32_t blk_pos = -1; // Block Position @@ -225,46 +236,44 @@ class orc_column_view { auto type() const noexcept { return cudf_column.type(); } auto is_string() const noexcept { return cudf_column.type().id() == type_id::STRING; } - void set_dict_stride(size_t stride) noexcept { _dict_stride = stride; } - [[nodiscard]] auto dict_stride() const noexcept { return _dict_stride; } - /** - * @brief Function that associates an existing dictionary chunk allocation - */ - void attach_dict_chunk(gpu::DictionaryChunk const* host_dict, - gpu::DictionaryChunk const* dev_dict) + void attach_rowgroup_char_counts(host_span counts) { - dict = host_dict; - d_dict = dev_dict; + rowgroup_char_counts = counts; } - [[nodiscard]] auto host_dict_chunk(size_t rowgroup) const + + [[nodiscard]] auto rowgroup_char_count(size_type rg_idx) const { - CUDF_EXPECTS(is_string(), "Dictionary chunks are only present in string columns."); - return &dict[rowgroup * _dict_stride + _str_idx]; + return rowgroup_char_counts[rg_idx]; + } + + [[nodiscard]] auto char_count() const + { + return std::accumulate(rowgroup_char_counts.begin(), rowgroup_char_counts.end(), size_type{0}); } - [[nodiscard]] auto device_dict_chunk() const { return d_dict; } [[nodiscard]] auto const& decimal_offsets() const { return d_decimal_offsets; } void attach_decimal_offsets(uint32_t* sizes_ptr) { d_decimal_offsets = sizes_ptr; } - /** - * @brief Function that associates an existing stripe dictionary allocation - */ - void attach_stripe_dict(gpu::StripeDictionary* host_stripe_dict, - gpu::StripeDictionary* dev_stripe_dict) + void attach_stripe_dicts(host_span host_stripe_dicts, + device_span dev_stripe_dicts) { - stripe_dict = host_stripe_dict; - d_stripe_dict = dev_stripe_dict; + stripe_dicts = host_stripe_dicts; + d_stripe_dicts = dev_stripe_dicts; } - [[nodiscard]] auto host_stripe_dict(size_t stripe) const + + [[nodiscard]] auto const& host_stripe_dict(size_t stripe) const { CUDF_EXPECTS(is_string(), "Stripe dictionary is only present in string columns."); - return &stripe_dict[stripe * _dict_stride + _str_idx]; + return stripe_dicts[stripe]; } - [[nodiscard]] auto device_stripe_dict() const noexcept { return d_stripe_dict; } + + [[nodiscard]] auto const& device_stripe_dicts() const noexcept { return d_stripe_dicts; } // Index in the table [[nodiscard]] uint32_t index() const noexcept { return _index; } + // Index in the table, including only string columns + [[nodiscard]] uint32_t str_index() const noexcept { return _str_idx; } // Id in the ORC file [[nodiscard]] auto id() const noexcept { return _index + 1; } @@ -308,12 +317,10 @@ class orc_column_view { int32_t _scale = 0; int32_t _precision = 0; - // String dictionary-related members - size_t _dict_stride = 0; - gpu::DictionaryChunk const* dict = nullptr; - gpu::StripeDictionary const* stripe_dict = nullptr; - gpu::DictionaryChunk const* d_dict = nullptr; - gpu::StripeDictionary const* d_stripe_dict = nullptr; + host_span rowgroup_char_counts; + + host_span stripe_dicts; + device_span d_stripe_dicts; // Offsets for encoded decimal elements. Used to enable direct writing of encoded decimal elements // into the output stream. @@ -440,8 +447,7 @@ file_segmentation calculate_segmentation(host_span column std::accumulate(columns.begin(), columns.end(), 0ul, [&](size_t total_size, auto const& col) { auto const rows = rowgroup_bounds[rg_idx][col.index()].size(); if (col.is_string()) { - const auto dt = col.host_dict_chunk(rg_idx); - return total_size + rows + dt->string_char_count; + return total_size + rows + col.rowgroup_char_count(rg_idx); } else { return total_size + col.type_width() * rows; } @@ -471,133 +477,6 @@ file_segmentation calculate_segmentation(host_span column return {std::move(rowgroup_bounds), std::move(infos)}; } -/** - * @brief Builds up column dictionaries indices - * - * @param orc_table Non-owning view of a cuDF table w/ ORC-related info - * @param rowgroup_bounds Ranges of rows in each rowgroup [rowgroup][column] - * @param dict_data Dictionary data memory - * @param dict_index Dictionary index memory - * @param dict List of dictionary chunks - * @param stream CUDA stream used for device memory operations and kernel launches - */ -void init_dictionaries(orc_table_view& orc_table, - device_2dspan rowgroup_bounds, - device_span> dict_data, - device_span> dict_index, - hostdevice_2dvector* dict, - rmm::cuda_stream_view stream) -{ - // Setup per-rowgroup dictionary indexes for each dictionary-aware column - for (auto col_idx : orc_table.string_column_indices) { - auto& str_column = orc_table.column(col_idx); - str_column.set_dict_stride(orc_table.num_string_columns()); - str_column.attach_dict_chunk(dict->base_host_ptr(), dict->base_device_ptr()); - } - - // Allocate temporary memory for dictionary indices - std::vector> dict_indices; - dict_indices.reserve(orc_table.num_string_columns()); - std::transform(orc_table.string_column_indices.cbegin(), - orc_table.string_column_indices.cend(), - std::back_inserter(dict_indices), - [&](auto& col_idx) { - auto& str_column = orc_table.column(col_idx); - return cudf::detail::make_zeroed_device_uvector_async( - str_column.size(), stream, rmm::mr::get_current_device_resource()); - }); - - // Create views of the temporary buffers in device memory - std::vector> dict_indices_views; - dict_indices_views.reserve(dict_indices.size()); - std::transform( - dict_indices.begin(), dict_indices.end(), std::back_inserter(dict_indices_views), [](auto& di) { - return device_span{di}; - }); - auto d_dict_indices_views = cudf::detail::make_device_uvector_async( - dict_indices_views, stream, rmm::mr::get_current_device_resource()); - - gpu::InitDictionaryIndices(orc_table.d_columns, - *dict, - dict_data, - dict_index, - d_dict_indices_views, - rowgroup_bounds, - orc_table.d_string_column_indices, - stream); - dict->device_to_host_sync(stream); -} - -/** - * @brief Builds up per-stripe dictionaries for string columns. - * - * @param orc_table Non-owning view of a cuDF table w/ ORC-related info - * @param stripe_bounds List of stripe boundaries - * @param dict List of dictionary chunks [rowgroup][column] - * @param dict_index List of dictionary indices - * @param dictionary_enabled Whether dictionary encoding is enabled for a given column - * @param stripe_dict List of stripe dictionaries - * @param enable_dictionary Whether dictionary is enabled - * @param stream CUDA stream used for device memory operations and kernel launches - */ -void build_dictionaries(orc_table_view& orc_table, - host_span stripe_bounds, - hostdevice_2dvector const& dict, - host_span> dict_index, - host_span dictionary_enabled, - hostdevice_2dvector& stripe_dict, - bool enable_dictionary, - rmm::cuda_stream_view stream) -{ - for (size_t dict_idx = 0; dict_idx < orc_table.num_string_columns(); ++dict_idx) { - auto& str_column = orc_table.string_column(dict_idx); - str_column.attach_stripe_dict(stripe_dict.base_host_ptr(), stripe_dict.base_device_ptr()); - - for (auto const& stripe : stripe_bounds) { - auto& sd = stripe_dict[stripe.id][dict_idx]; - sd.dict_data = str_column.host_dict_chunk(stripe.first)->dict_data; - sd.dict_index = dict_index[dict_idx].data(); // Indexed by abs row - sd.column_id = orc_table.string_column_indices[dict_idx]; - sd.start_chunk = stripe.first; - sd.num_chunks = stripe.size; - sd.dict_char_count = 0; - sd.num_strings = - std::accumulate(stripe.cbegin(), stripe.cend(), 0, [&](auto dt_str_cnt, auto rg_idx) { - const auto& dt = dict[rg_idx][dict_idx]; - return dt_str_cnt + dt.num_dict_strings; - }); - sd.leaf_column = dict[0][dict_idx].leaf_column; - } - - if (enable_dictionary) { - struct string_column_cost { - size_t direct = 0; - size_t dictionary = 0; - }; - auto const col_cost = - std::accumulate(stripe_bounds.front().cbegin(), - stripe_bounds.back().cend(), - string_column_cost{}, - [&](auto cost, auto rg_idx) -> string_column_cost { - const auto& dt = dict[rg_idx][dict_idx]; - return {cost.direct + dt.string_char_count, - cost.dictionary + dt.dict_char_count + dt.num_dict_strings}; - }); - // Disable dictionary if it does not reduce the output size - if (!dictionary_enabled[orc_table.string_column(dict_idx).index()] || - col_cost.dictionary >= col_cost.direct) { - for (auto const& stripe : stripe_bounds) { - stripe_dict[stripe.id][dict_idx].dict_data = nullptr; - } - } - } - } - - stripe_dict.host_to_device_async(stream); - gpu::BuildStripeDictionaries(stripe_dict, stripe_dict, dict, stream); - stripe_dict.device_to_host_sync(stream); -} - /** * @brief Returns the maximum size of RLE encoded values of an integer type. **/ @@ -752,24 +631,15 @@ orc_streams create_streams(host_span columns, size_t dict_lengths_div512 = 0; for (auto const& stripe : segmentation.stripes) { auto const sd = column.host_stripe_dict(stripe.id); - enable_dict = (enable_dict && sd->dict_data != nullptr); + enable_dict = (enable_dict && sd.is_enabled); if (enable_dict) { - dict_strings += sd->num_strings; - dict_lengths_div512 += (sd->num_strings + 0x1ff) >> 9; - dict_data_size += sd->dict_char_count; + dict_strings += sd.entry_count; + dict_lengths_div512 += (sd.entry_count + 0x1ff) >> 9; + dict_data_size += sd.char_count; } } - auto const direct_data_size = - segmentation.num_stripes() == 0 - ? 0 - : std::accumulate(segmentation.stripes.front().cbegin(), - segmentation.stripes.back().cend(), - size_t{0}, - [&](auto data_size, auto rg_idx) { - return data_size + - column.host_dict_chunk(rg_idx)->string_char_count; - }); + size_t const direct_data_size = column.char_count(); if (enable_dict) { uint32_t dict_bits = 0; for (dict_bits = 1; dict_bits < 32; dict_bits <<= 1) { @@ -972,7 +842,6 @@ struct segmented_valid_cnt_input { }; encoded_data encode_columns(orc_table_view const& orc_table, - string_dictionaries&& dictionaries, encoder_decimal_info&& dec_chunk_sizes, file_segmentation const& segmentation, orc_streams const& streams, @@ -1000,7 +869,7 @@ encoded_data encode_columns(orc_table_view const& orc_table, ck.type_kind = column.orc_kind(); if (ck.type_kind == TypeKind::STRING) { ck.dict_index = (ck.encoding_kind == DICTIONARY_V2) - ? column.host_stripe_dict(stripe.id)->dict_index + ? column.host_stripe_dict(stripe.id).index.data() : nullptr; ck.dtype_len = 1; } else { @@ -1084,18 +953,17 @@ encoded_data encode_columns(orc_table_view const& orc_table, if ((strm_type == gpu::CI_DICTIONARY) || (strm_type == gpu::CI_DATA2 && ck.encoding_kind == DICTIONARY_V2)) { if (rg_idx == *stripe.cbegin()) { - const auto stripe_dict = column.host_stripe_dict(stripe.id); + auto const stripe_dict = column.host_stripe_dict(stripe.id); strm.lengths[strm_type] = (strm_type == gpu::CI_DICTIONARY) - ? stripe_dict->dict_char_count - : (((stripe_dict->num_strings + 0x1ff) >> 9) * (512 * 4 + 2)); + ? stripe_dict.char_count + : (((stripe_dict.entry_count + 0x1ff) >> 9) * (512 * 4 + 2)); } else { strm.lengths[strm_type] = 0; } } else if (strm_type == gpu::CI_DATA && ck.type_kind == TypeKind::STRING && ck.encoding_kind == DIRECT_V2) { - strm.lengths[strm_type] = - std::max(column.host_dict_chunk(rg_idx)->string_char_count, 1u); + strm.lengths[strm_type] = std::max(column.rowgroup_char_count(rg_idx), 1); } else if (strm_type == gpu::CI_DATA && streams[strm_id].length == 0 && (ck.type_kind == DOUBLE || ck.type_kind == FLOAT)) { // Pass-through @@ -1146,8 +1014,9 @@ encoded_data encode_columns(orc_table_view const& orc_table, if (orc_table.num_rows() > 0) { if (orc_table.num_string_columns() != 0) { - auto d_stripe_dict = orc_table.string_column(0).device_stripe_dict(); - gpu::EncodeStripeDictionaries(d_stripe_dict, + auto d_stripe_dict = orc_table.string_column(0).device_stripe_dicts(); + gpu::EncodeStripeDictionaries(d_stripe_dict.data(), + orc_table.d_columns, chunks, orc_table.num_string_columns(), segmentation.num_stripes(), @@ -1157,8 +1026,6 @@ encoded_data encode_columns(orc_table_view const& orc_table, gpu::EncodeOrcColumnData(chunks, chunk_streams, stream); } - dictionaries.data.clear(); - dictionaries.index.clear(); chunk_streams.device_to_host_sync(stream); return {std::move(encoded_data), std::move(chunk_streams)}; @@ -2028,7 +1895,7 @@ encoder_decimal_info decimal_chunk_sizes(orc_table_view& orc_table, }(); if (col.is_null(idx) or not bit_value_or(pushdown_mask, idx, true)) - return 0u; + return 0; __int128_t const element = col.type().id() == type_id::DECIMAL32 ? col.element(idx) @@ -2038,12 +1905,7 @@ encoder_decimal_info decimal_chunk_sizes(orc_table_view& orc_table, __int128_t const sign = (element < 0) ? 1 : 0; __uint128_t zigzaged_value = ((element ^ -sign) * 2) + sign; - uint32_t encoded_length = 1; - while (zigzaged_value > 127) { - zigzaged_value >>= 7u; - ++encoded_length; - } - return encoded_length; + return varint_size(zigzaged_value); }); // Compute element offsets within each row group @@ -2101,55 +1963,6 @@ std::map decimal_column_sizes( return column_sizes; } -string_dictionaries allocate_dictionaries(orc_table_view const& orc_table, - host_2dspan rowgroup_bounds, - rmm::cuda_stream_view stream) -{ - thrust::host_vector is_dict_enabled(orc_table.num_columns()); - for (auto col_idx : orc_table.string_column_indices) - is_dict_enabled[col_idx] = std::all_of( - thrust::make_counting_iterator(0ul), - thrust::make_counting_iterator(rowgroup_bounds.size().first), - [&](auto rg_idx) { - return rowgroup_bounds[rg_idx][col_idx].size() < std::numeric_limits::max(); - }); - - std::vector> data; - std::transform(orc_table.string_column_indices.begin(), - orc_table.string_column_indices.end(), - std::back_inserter(data), - [&](auto& idx) { - return cudf::detail::make_zeroed_device_uvector_async( - orc_table.columns[idx].size(), stream, rmm::mr::get_current_device_resource()); - }); - std::vector> index; - std::transform(orc_table.string_column_indices.begin(), - orc_table.string_column_indices.end(), - std::back_inserter(index), - [&](auto& idx) { - return cudf::detail::make_zeroed_device_uvector_async( - orc_table.columns[idx].size(), stream, rmm::mr::get_current_device_resource()); - }); - stream.synchronize(); - - std::vector> data_ptrs; - std::transform(data.begin(), data.end(), std::back_inserter(data_ptrs), [](auto& uvec) { - return device_span{uvec}; - }); - std::vector> index_ptrs; - std::transform(index.begin(), index.end(), std::back_inserter(index_ptrs), [](auto& uvec) { - return device_span{uvec}; - }); - - return {std::move(data), - std::move(index), - cudf::detail::make_device_uvector_sync( - data_ptrs, stream, rmm::mr::get_current_device_resource()), - cudf::detail::make_device_uvector_sync( - index_ptrs, stream, rmm::mr::get_current_device_resource()), - std::move(is_dict_enabled)}; -} - size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) { if (compression_kind == NONE) return 0; @@ -2177,6 +1990,160 @@ std::unique_ptr make_table_meta(table_view const& input) return table_meta; } +// Computes the number of characters in each rowgroup for each string column and attaches the +// results to the corresponding orc_column_view. The owning host vector is returned. +auto set_rowgroup_char_counts(orc_table_view& orc_table, + device_2dspan rowgroup_bounds, + rmm::cuda_stream_view stream) +{ + auto const num_rowgroups = rowgroup_bounds.size().first; + auto const num_str_cols = orc_table.num_string_columns(); + + auto counts = rmm::device_uvector(num_str_cols * num_rowgroups, stream); + auto counts_2d_view = device_2dspan(counts.data(), num_str_cols, num_rowgroups); + gpu::rowgroup_char_counts(counts_2d_view, + orc_table.d_columns, + rowgroup_bounds, + orc_table.d_string_column_indices, + stream); + + auto const h_counts = cudf::detail::make_std_vector_sync(counts, stream); + + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + str_column.attach_rowgroup_char_counts( + {h_counts.data() + str_column.str_index() * num_rowgroups, num_rowgroups}); + } + + return h_counts; +} + +// Holds the stripe dictionary descriptors and dictionary buffers. +struct stripe_dictionaries { + hostdevice_2dvector views; // descriptors [string_column][stripe] + std::vector> data_owner; // dictionary data owner, per stripe + std::vector> index_owner; // dictionary index owner, per stripe + + // Should be called after encoding is complete to deallocate the dictionary buffers. + void on_encode_complete(rmm::cuda_stream_view stream) + { + data_owner.clear(); + index_owner.clear(); + + for (auto& sd : views.host_view().flat_view()) { + sd.data = {}; + sd.index = {}; + } + views.host_to_device_async(stream); + } +}; + +// Build stripe dictionaries for string columns +stripe_dictionaries build_dictionaries(orc_table_view& orc_table, + file_segmentation const& segmentation, + rmm::cuda_stream_view stream) +{ + std::vector>> hash_maps_storage( + orc_table.string_column_indices.size()); + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + for (auto const& stripe : segmentation.stripes) { + auto const stripe_num_rows = + stripe.size == 0 ? 0 + : segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - + segmentation.rowgroups[stripe.first][col_idx].begin; + hash_maps_storage[str_column.str_index()].emplace_back(stripe_num_rows * 1.43, stream); + } + } + + hostdevice_2dvector stripe_dicts( + orc_table.num_string_columns(), segmentation.num_stripes(), stream); + if (stripe_dicts.count() == 0) return {std::move(stripe_dicts), {}, {}}; + + // Initialize stripe dictionaries + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + auto const str_col_idx = str_column.str_index(); + str_column.attach_stripe_dicts(stripe_dicts[str_col_idx], + stripe_dicts.device_view()[str_col_idx]); + for (auto const& stripe : segmentation.stripes) { + auto const stripe_idx = stripe.id; + auto& sd = stripe_dicts[str_col_idx][stripe_idx]; + + sd.map_slots = hash_maps_storage[str_col_idx][stripe_idx]; + sd.column_idx = col_idx; + sd.start_row = segmentation.rowgroups[stripe.first][col_idx].begin; + sd.start_rowgroup = stripe.first; + sd.num_rows = + segmentation.rowgroups[stripe.first + stripe.size - 1][col_idx].end - sd.start_row; + + sd.entry_count = 0; + sd.char_count = 0; + } + } + stripe_dicts.host_to_device_async(stream); + + gpu::initialize_dictionary_hash_maps(stripe_dicts, stream); + gpu::populate_dictionary_hash_maps(stripe_dicts, orc_table.d_columns, stream); + // Copy the entry counts and char counts from the device to the host + stripe_dicts.device_to_host_sync(stream); + + // Data owners; can be cleared after encode + std::vector> dict_data_owner; + std::vector> dict_index_owner; + // Make decision about which stripes to encode with dictionary encoding + for (auto col_idx : orc_table.string_column_indices) { + auto& str_column = orc_table.column(col_idx); + bool col_use_dictionary{false}; + for (auto const& stripe : segmentation.stripes) { + auto const stripe_idx = stripe.id; + auto const str_col_idx = str_column.str_index(); + auto& sd = stripe_dicts[str_col_idx][stripe_idx]; + auto const direct_char_count = std::accumulate( + thrust::make_counting_iterator(stripe.first), + thrust::make_counting_iterator(stripe.first + stripe.size), + 0, + [&](auto total, auto const& rg) { return total + str_column.rowgroup_char_count(rg); }); + // Enable dictionary encoding if the dictionary size is smaller than the direct encode size + // The estimate excludes the LENGTH stream size, which is present in both cases + sd.is_enabled = [&]() { + auto const dict_index_size = varint_size(sd.entry_count); + return sd.char_count + dict_index_size * sd.entry_count < direct_char_count; + }(); + if (sd.is_enabled) { + dict_data_owner.emplace_back(sd.entry_count, stream); + sd.data = dict_data_owner.back(); + col_use_dictionary = true; + } else { + // Clear hash map storage as dictionary encoding is not used for this stripe + hash_maps_storage[str_col_idx][stripe_idx] = rmm::device_uvector(0, stream); + sd.map_slots = {}; + } + } + // If any stripe uses dictionary encoding, allocate index storage for the whole column + if (col_use_dictionary) { + dict_index_owner.emplace_back(str_column.size(), stream); + for (auto& sd : stripe_dicts[str_column.str_index()]) { + sd.index = dict_index_owner.back(); + } + } + } + stripe_dicts.host_to_device_async(stream); + + gpu::collect_map_entries(stripe_dicts, stream); + gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream); + + // Clear map slots; hash map storage is deallocated at the end of this function + auto device_dicts_flat = stripe_dicts.device_view().flat_view(); + thrust::for_each(rmm::exec_policy(stream), + device_dicts_flat.begin(), + device_dicts_flat.end(), + [] __device__(auto& sd) { sd.map_slots = {}; }); + stripe_dicts.device_to_host_async(stream); + + return {std::move(stripe_dicts), std::move(dict_data_owner), std::move(dict_index_owner)}; +} + /** * @brief Perform the processing steps needed to convert the input table into the output ORC data * for writing, such as compression and ORC encoding. @@ -2217,37 +2184,14 @@ auto convert_table_to_orc_data(table_view const& input, auto rowgroup_bounds = calculate_rowgroup_bounds(orc_table, row_index_stride, stream); - // Build per-column dictionary indices - auto dictionaries = allocate_dictionaries(orc_table, rowgroup_bounds, stream); - hostdevice_2dvector dict( - rowgroup_bounds.size().first, orc_table.num_string_columns(), stream); - if (not dict.is_empty()) { - init_dictionaries(orc_table, - rowgroup_bounds, - dictionaries.d_data_view, - dictionaries.d_index_view, - &dict, - stream); - } + [[maybe_unused]] auto const rg_char_counts_data = + set_rowgroup_char_counts(orc_table, rowgroup_bounds, stream); - // Decide stripe boundaries based on rowgroups and dict chunks + // Decide stripe boundaries based on rowgroups and char counts auto segmentation = calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size); - // Build stripe-level dictionaries - hostdevice_2dvector stripe_dict( - segmentation.num_stripes(), orc_table.num_string_columns(), stream); - if (not stripe_dict.is_empty()) { - build_dictionaries(orc_table, - segmentation.stripes, - dict, - dictionaries.index, - dictionaries.dictionary_enabled, - stripe_dict, - enable_dictionary, - stream); - } - + auto stripe_dicts = build_dictionaries(orc_table, segmentation, stream); auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); auto const uncompressed_block_align = uncomp_block_alignment(compression_kind); @@ -2259,13 +2203,10 @@ auto convert_table_to_orc_data(table_view const& input, enable_dictionary, compression_kind, write_mode); - auto enc_data = encode_columns(orc_table, - std::move(dictionaries), - std::move(dec_chunk_sizes), - segmentation, - streams, - uncompressed_block_align, - stream); + auto enc_data = encode_columns( + orc_table, std::move(dec_chunk_sizes), segmentation, streams, uncompressed_block_align, stream); + + stripe_dicts.on_encode_complete(stream); auto const num_rows = input.num_rows(); @@ -2287,7 +2228,7 @@ auto convert_table_to_orc_data(table_view const& input, std::optional{}, std::move(streams), std::move(stripes), - std::move(stripe_dict), + std::move(stripe_dicts.views), cudf::detail::pinned_host_vector()}; } @@ -2367,7 +2308,7 @@ auto convert_table_to_orc_data(table_view const& input, std::move(compression_stats), std::move(streams), std::move(stripes), - std::move(stripe_dict), + std::move(stripe_dicts.views), std::move(bounce_buffer)}; } @@ -2444,7 +2385,7 @@ void writer::impl::write(table_view const& input) compression_stats, streams, stripes, - stripe_dict, /* unused, but its data will be accessed via pointer later */ + stripe_dicts, /* unused, but its data will be accessed via pointer later */ bounce_buffer] = [&] { try { return convert_table_to_orc_data(input, @@ -2562,7 +2503,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, sf.columns[i].kind = orc_table.column(i - 1).orc_encoding(); sf.columns[i].dictionarySize = (sf.columns[i].kind == DICTIONARY_V2) - ? orc_table.column(i - 1).host_stripe_dict(stripe_id)->num_strings + ? orc_table.column(i - 1).host_stripe_dict(stripe_id).entry_count : 0; if (orc_table.column(i - 1).orc_kind() == TIMESTAMP) { sf.writerTimezone = "UTC"; } } diff --git a/cpp/src/text/subword/bpe_tokenizer.cuh b/cpp/src/text/subword/bpe_tokenizer.cuh index 24b10fc4a36..3548870e6b1 100644 --- a/cpp/src/text/subword/bpe_tokenizer.cuh +++ b/cpp/src/text/subword/bpe_tokenizer.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +23,12 @@ #include #include -#include - #include #include #include +#include + #include namespace nvtext {