diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a6f7a41825d..cd6d3217aad 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -137,6 +137,9 @@ include(cmake/thirdparty/CUDF_GetArrow.cmake) include(cmake/thirdparty/CUDF_GetDLPack.cmake) # find libcu++ include(cmake/thirdparty/CUDF_GetLibcudacxx.cmake) +# find cuCollections +# Should come after including thrust and libcudacxx +include(cmake/thirdparty/CUDF_GetcuCollections.cmake) # find or install GoogleTest include(cmake/thirdparty/CUDF_GetGTest.cmake) # preprocess jitify-able kernels @@ -283,7 +286,7 @@ add_library(cudf src/io/orc/writer_impl.cu src/io/parquet/compact_protocol_writer.cpp src/io/parquet/page_data.cu - src/io/parquet/page_dict.cu + src/io/parquet/chunk_dict.cu src/io/parquet/page_enc.cu src/io/parquet/page_hdr.cu src/io/parquet/parquet.cpp @@ -524,7 +527,8 @@ target_link_libraries(cudf PUBLIC ZLIB::ZLIB ${ARROW_LIBRARIES} cudf::Thrust - rmm::rmm) + rmm::rmm + PRIVATE cuco::cuco) if(CUDA_STATIC_RUNTIME) # Tell CMake what CUDA language runtime to use diff --git a/cpp/cmake/thirdparty/CUDF_GetcuCollections.cmake b/cpp/cmake/thirdparty/CUDF_GetcuCollections.cmake new file mode 100644 index 00000000000..73717249585 --- /dev/null +++ b/cpp/cmake/thirdparty/CUDF_GetcuCollections.cmake @@ -0,0 +1,38 @@ +#============================================================================= +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#============================================================================= + +function(find_and_configure_cucollections) + + if(TARGET cuco::cuco) + return() + endif() + + # Find or install cuCollections + CPMFindPackage(NAME cuco + GITHUB_REPOSITORY NVIDIA/cuCollections + GIT_TAG 0d602ae21ea4f38d23ed816aa948453d97b2ee4e + OPTIONS "BUILD_TESTS OFF" + "BUILD_BENCHMARKS OFF" + "BUILD_EXAMPLES OFF" + ) + + set(CUCO_INCLUDE_DIR "${cuco_SOURCE_DIR}/include" PARENT_SCOPE) + + # Make sure consumers of cudf can also see cuco::cuco target + fix_cmake_global_defaults(cuco::cuco) +endfunction() + +find_and_configure_cucollections() diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu new file mode 100644 index 00000000000..64b3dd69c0d --- /dev/null +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include + +namespace cudf { +namespace io { +namespace parquet { +namespace gpu { + +template +__global__ void __launch_bounds__(block_size, 1) + initialize_chunk_hash_maps_kernel(device_span chunks) +{ + auto chunk = chunks[blockIdx.x]; + auto t = threadIdx.x; + // fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk + for (size_t i = 0; i < chunk.dict_map_size; i += block_size) { + if (t + i < chunk.dict_map_size) { + new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; + new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; + } + } +} + +template +struct equality_functor { + column_device_view const& col; + __device__ bool operator()(size_type lhs_idx, size_type rhs_idx) + { + // We don't call this for nulls so this is fine + return equality_compare(col.element(lhs_idx), col.element(rhs_idx)); + } +}; + +template +struct hash_functor { + column_device_view const& col; + __device__ auto operator()(size_type idx) { return MurmurHash3_32{}(col.element(idx)); } +}; + +struct map_insert_fn { + map_type::device_mutable_view& map; + + template + __device__ bool operator()(column_device_view const& col, size_type i) + { + if constexpr (column_device_view::has_element_accessor()) { + auto hash_fn = hash_functor{col}; + auto equality_fn = equality_functor{col}; + return map.insert(std::make_pair(i, i), hash_fn, equality_fn); + } else { + cudf_assert(false && "Unsupported type to insert in map"); + } + return false; + } +}; + +struct map_find_fn { + map_type::device_view& map; + + template + __device__ auto operator()(column_device_view const& col, size_type i) + { + if constexpr (column_device_view::has_element_accessor()) { + auto hash_fn = hash_functor{col}; + auto equality_fn = equality_functor{col}; + return map.find(i, hash_fn, equality_fn); + } else { + cudf_assert(false && "Unsupported type to insert in map"); + } + return map.end(); + } +}; + +template +__global__ void __launch_bounds__(block_size, 1) + populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan chunks, + size_type num_rows) +{ + auto col_idx = blockIdx.y; + auto block_x = blockIdx.x; + auto t = threadIdx.x; + + auto start_row = + block_x * + max_page_fragment_size; // This is fragment size. all chunks are multiple of these many rows. + size_type end_row = min(start_row + max_page_fragment_size, num_rows); + + __shared__ EncColumnChunk* s_chunk; + __shared__ parquet_column_device_view s_col; + __shared__ size_type s_start_value_idx; + __shared__ size_type s_num_values; + if (t == 0) { + // Find the chunk this block is a part of + size_type num_rowgroups = chunks.size().first; + size_type rg_idx = 0; + while (rg_idx < num_rowgroups) { + if (auto ck = chunks[rg_idx][col_idx]; + start_row >= ck.start_row and start_row < ck.start_row + ck.num_rows) { + break; + } + ++rg_idx; + } + s_chunk = &chunks[rg_idx][col_idx]; + s_col = *(s_chunk->col_desc); + } + __syncthreads(); + if (not s_chunk->use_dictionary) { return; } + + if (t == 0) { + // Find the bounds of values in leaf column to be inserted into the map for current chunk + auto col = *(s_col.parent_column); + auto start_value_idx = start_row; + auto end_value_idx = end_row; + while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { + if (col.type().id() == type_id::STRUCT) { + start_value_idx += col.offset(); + end_value_idx += col.offset(); + col = col.child(0); + } else { + auto offset_col = col.child(lists_column_view::offsets_column_index); + start_value_idx = offset_col.element(start_value_idx + col.offset()); + end_value_idx = offset_col.element(end_value_idx + col.offset()); + col = col.child(lists_column_view::child_column_index); + } + } + s_start_value_idx = start_value_idx; + s_num_values = end_value_idx - start_value_idx; + } + __syncthreads(); + + column_device_view const& data_col = *s_col.leaf_column; + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage reduce_storage; + + // Make a view of the hash map + auto hash_map_mutable = map_type::device_mutable_view( + s_chunk->dict_map_slots, s_chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + auto hash_map = map_type::device_view( + s_chunk->dict_map_slots, s_chunk->dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + + __shared__ int total_num_dict_entries; + for (size_type i = 0; i < s_num_values; i += block_size) { + // add the value to hash map + size_type val_idx = i + t + s_start_value_idx; + bool is_valid = + (i + t < s_num_values && val_idx < data_col.size()) and data_col.is_valid(val_idx); + + // insert element at val_idx to hash map and count successful insertions + size_type is_unique = 0; + size_type uniq_elem_size = 0; + if (is_valid) { + auto found_slot = type_dispatcher(data_col.type(), map_find_fn{hash_map}, data_col, val_idx); + if (found_slot == hash_map.end()) { + is_unique = + type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); + uniq_elem_size = [&]() -> size_type { + if (not is_unique) { return 0; } + switch (s_col.physical_type) { + case Type::INT32: return 4; + case Type::INT64: return 8; + case Type::INT96: return 12; + case Type::FLOAT: return 4; + case Type::DOUBLE: return 8; + case Type::BYTE_ARRAY: + if (data_col.type().id() == type_id::STRING) { + // Strings are stored as 4 byte length + string bytes + return 4 + data_col.element(val_idx).size_bytes(); + } + case Type::FIXED_LEN_BYTE_ARRAY: + default: cudf_assert(false && "Unsupported type for dictionary encoding"); return 0; + } + }(); + } + } + + __syncthreads(); + auto num_unique = block_reduce(reduce_storage).Sum(is_unique); + __syncthreads(); + auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); + if (t == 0) { + total_num_dict_entries = atomicAdd(&s_chunk->num_dict_entries, num_unique); + total_num_dict_entries += num_unique; + atomicAdd(&s_chunk->uniq_data_size, uniq_data_size); + } + __syncthreads(); + + // Check if the num unique values in chunk has already exceeded max dict size and early exit + if (total_num_dict_entries > MAX_DICT_SIZE) { return; } + } +} + +template +__global__ void __launch_bounds__(block_size, 1) + collect_map_entries_kernel(device_span chunks) +{ + auto& chunk = chunks[blockIdx.x]; + if (not chunk.use_dictionary) { return; } + + auto t = threadIdx.x; + auto map = + map_type::device_view(chunk.dict_map_slots, chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + + __shared__ size_type counter; + if (t == 0) counter = 0; + __syncthreads(); + for (size_t i = 0; i < chunk.dict_map_size; i += block_size) { + if (t + i < chunk.dict_map_size) { + auto slot = map.begin_slot() + t + i; + auto key = static_cast(slot->first); + if (key != KEY_SENTINEL) { + auto loc = atomicAdd(&counter, 1); + cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size"); + chunk.dict_data[loc] = key; + // If sorting dict page ever becomes a hard requirement, enable the following statement and + // add a dict sorting step before storing into the slot's second field. + // chunk.dict_data_idx[loc] = t + i; + slot->second.store(loc); + // TODO: ^ This doesn't need to be atomic. Try casting to value_type ptr and just writing. + } + } + } +} + +template +__global__ void __launch_bounds__(block_size, 1) + get_dictionary_indices_kernel(cudf::detail::device_2dspan chunks, + size_type num_rows) +{ + auto col_idx = blockIdx.y; + auto block_x = blockIdx.x; + auto t = threadIdx.x; + + size_type start_row = block_x * max_page_fragment_size; + size_type end_row = min(start_row + max_page_fragment_size, num_rows); + + __shared__ EncColumnChunk s_chunk; + __shared__ parquet_column_device_view s_col; + __shared__ size_type s_start_value_idx; + __shared__ size_type s_ck_start_val_idx; + __shared__ size_type s_num_values; + + if (t == 0) { + // Find the chunk this block is a part of + size_type num_rowgroups = chunks.size().first; + size_type rg_idx = 0; + while (rg_idx < num_rowgroups) { + if (auto ck = chunks[rg_idx][col_idx]; + start_row >= ck.start_row and start_row < ck.start_row + ck.num_rows) { + break; + } + ++rg_idx; + } + s_chunk = chunks[rg_idx][col_idx]; + s_col = *(s_chunk.col_desc); + + // Find the bounds of values in leaf column to be inserted into the map for current chunk + + auto col = *(s_col.parent_column); + auto start_value_idx = start_row; + auto end_value_idx = end_row; + auto chunk_start_val_idx = s_chunk.start_row; + while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { + if (col.type().id() == type_id::STRUCT) { + start_value_idx += col.offset(); + chunk_start_val_idx += col.offset(); + end_value_idx += col.offset(); + col = col.child(0); + } else { + auto offset_col = col.child(lists_column_view::offsets_column_index); + start_value_idx = offset_col.element(start_value_idx + col.offset()); + chunk_start_val_idx = offset_col.element(chunk_start_val_idx + col.offset()); + end_value_idx = offset_col.element(end_value_idx + col.offset()); + col = col.child(lists_column_view::child_column_index); + } + } + s_start_value_idx = start_value_idx; + s_ck_start_val_idx = chunk_start_val_idx; + s_num_values = end_value_idx - start_value_idx; + } + __syncthreads(); + + if (not s_chunk.use_dictionary) { return; } + + column_device_view const& data_col = *s_col.leaf_column; + + auto map = map_type::device_view( + s_chunk.dict_map_slots, s_chunk.dict_map_size, KEY_SENTINEL, VALUE_SENTINEL); + + for (size_t i = 0; i < s_num_values; i += block_size) { + if (t + i < s_num_values) { + auto val_idx = s_start_value_idx + t + i; + bool is_valid = + (i + t < s_num_values && val_idx < data_col.size()) ? data_col.is_valid(val_idx) : false; + + if (is_valid) { + auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); + cudf_assert(found_slot != map.end() && + "Unable to find value in map in dictionary index construction"); + if (found_slot != map.end()) { + // No need for atomic as this is not going to be modified by any other thread + auto* val_ptr = reinterpret_cast(&found_slot->second); + s_chunk.dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; + } + } + } + } +} + +void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_stream_view stream) +{ + constexpr int block_size = 1024; + initialize_chunk_hash_maps_kernel + <<>>(chunks); +} + +void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, + size_type num_rows, + rmm::cuda_stream_view stream) +{ + constexpr int block_size = 256; + auto const grid_x = cudf::detail::grid_1d(num_rows, max_page_fragment_size); + auto const num_columns = chunks.size().second; + dim3 const dim_grid(grid_x.num_blocks, num_columns); + + populate_chunk_hash_maps_kernel + <<>>(chunks, num_rows); +} + +void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream) +{ + constexpr int block_size = 1024; + collect_map_entries_kernel<<>>(chunks); +} + +void get_dictionary_indices(cudf::detail::device_2dspan chunks, + size_type num_rows, + rmm::cuda_stream_view stream) +{ + constexpr int block_size = 256; + auto const grid_x = cudf::detail::grid_1d(num_rows, max_page_fragment_size); + auto const num_columns = chunks.size().second; + dim3 const dim_grid(grid_x.num_blocks, num_columns); + + get_dictionary_indices_kernel + <<>>(chunks, num_rows); +} +} // namespace gpu +} // namespace parquet +} // namespace io +} // namespace cudf diff --git a/cpp/src/io/parquet/page_dict.cu b/cpp/src/io/parquet/page_dict.cu deleted file mode 100644 index 0c55828b120..00000000000 --- a/cpp/src/io/parquet/page_dict.cu +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "parquet_gpu.hpp" - -#include - -#include - -#include - -namespace cudf { -namespace io { -namespace parquet { -namespace gpu { -struct dict_state_s { - uint32_t row_cnt; - PageFragment* cur_fragment; - uint32_t* hashmap; - uint32_t total_dict_entries; //!< Total number of entries in dictionary - uint32_t dictionary_size; //!< Total dictionary size in bytes - uint32_t num_dict_entries; //!< Dictionary entries in current fragment to add - uint32_t frag_dict_size; - EncColumnChunk ck; - parquet_column_device_view col; - PageFragment frag; - volatile uint32_t scratch_red[32]; - uint16_t frag_dict[max_page_fragment_size]; -}; - -/** - * @brief Computes a 16-bit dictionary hash - */ -inline __device__ uint32_t uint32_hash16(uint32_t v) { return (v + (v >> 16)) & 0xffff; } - -inline __device__ uint32_t uint64_hash16(uint64_t v) -{ - return uint32_hash16((uint32_t)(v + (v >> 32))); -} - -inline __device__ uint32_t hash_string(const string_view& val) -{ - const char* p = val.data(); - uint32_t len = val.size_bytes(); - uint32_t hash = len; - if (len > 0) { - uint32_t align_p = 3 & reinterpret_cast(p); - const uint32_t* p32 = reinterpret_cast(p - align_p); - uint32_t ofs = align_p * 8; - uint32_t v; - while (len > 4) { - v = *p32++; - if (ofs) { v = __funnelshift_r(v, *p32, ofs); } - hash = __funnelshift_l(hash, hash, 5) + v; - len -= 4; - } - v = *p32; - if (ofs) { v = __funnelshift_r(v, (align_p + len > 4) ? p32[1] : 0, ofs); } - v &= ((2 << (len * 8 - 1)) - 1); - hash = __funnelshift_l(hash, hash, 5) + v; - } - return uint32_hash16(hash); -} - -/** - * @brief Fetch a page fragment and its dictionary entries in row-ascending order - * - * @param[in,out] s dictionary state - * @param[in,out] dict_data fragment dictionary data for the current column (zeroed out after - *fetching) - * @param[in] frag_start_row row position of current fragment - * @param[in] t thread id - */ -__device__ void FetchDictionaryFragment(dict_state_s* s, - uint32_t* dict_data, - uint32_t frag_start_row, - uint32_t t) -{ - if (t == 0) s->frag = *s->cur_fragment; - __syncthreads(); - // Store the row values in shared mem and set the corresponding dict_data to zero (end-of-list) - // It's easiest to do this here since we're only dealing with values all within a 5K-row window - for (uint32_t i = t; i < s->frag.num_dict_vals; i += 1024) { - uint32_t r = dict_data[frag_start_row + i] - frag_start_row; - s->frag_dict[i] = r; - } - __syncthreads(); - for (uint32_t i = t; i < s->frag.num_dict_vals; i += 1024) { - uint32_t r = s->frag_dict[i]; - dict_data[frag_start_row + r] = 0; - } - __syncthreads(); -} - -/// Generate dictionary indices in ascending row order -template -__device__ void GenerateDictionaryIndices(dict_state_s* s, uint32_t t) -{ - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage temp_storage; - uint32_t* dict_index = s->col.dict_index; - uint32_t* dict_data = s->col.dict_data + s->ck.start_row; - uint32_t num_dict_entries = 0; - - for (uint32_t i = 0; i < s->row_cnt; i += 1024) { - uint32_t row = s->ck.start_row + i + t; - uint32_t is_valid = - (i + t < s->row_cnt && row < s->col.num_rows) ? s->col.leaf_column->is_valid(row) : 0; - uint32_t dict_idx = (is_valid) ? dict_index[row] : 0; - uint32_t is_unique = - (is_valid && - dict_idx == - row); // Any value that doesn't have bit31 set should have dict_idx=row at this point - uint32_t block_num_dict_entries; - uint32_t pos; - block_scan(temp_storage).ExclusiveSum(is_unique, pos, block_num_dict_entries); - pos += num_dict_entries; - num_dict_entries += block_num_dict_entries; - if (is_valid && is_unique) { - dict_data[pos] = row; - dict_index[row] = pos; - } - __syncthreads(); - if (is_valid && !is_unique) { - // NOTE: Should have at most 3 iterations (once for early duplicate elimination, once for - // final dictionary duplicate elimination and once for re-ordering) (If something went wrong - // building the dictionary, it will likely hang or crash right here) - do { - dict_idx = dict_index[dict_idx & 0x7fffffff]; - } while (dict_idx > 0x7fffffff); - dict_index[row] = dict_idx; - } - } -} - -// blockDim(1024, 1, 1) -template -__global__ void __launch_bounds__(block_size, 1) - gpuBuildChunkDictionaries(device_span chunks, uint32_t* dev_scratch) -{ - __shared__ __align__(8) dict_state_s state_g; - using block_reduce = cub::BlockReduce; - __shared__ typename block_reduce::TempStorage temp_storage; - - dict_state_s* const s = &state_g; - uint32_t t = threadIdx.x; - uint32_t dtype, dtype_len, dtype_len_in; - - if (t == 0) s->ck = chunks[blockIdx.x]; - __syncthreads(); - - if (!s->ck.has_dictionary) { return; } - - if (t == 0) s->col = *s->ck.col_desc; - __syncthreads(); - - if (!t) { - s->hashmap = dev_scratch + s->ck.dictionary_id * (size_t)(1 << kDictHashBits); - s->row_cnt = 0; - s->cur_fragment = s->ck.fragments; - s->total_dict_entries = 0; - s->dictionary_size = 0; - s->ck.num_dict_fragments = 0; - } - dtype = s->col.physical_type; - dtype_len = (dtype == INT96) ? 12 : (dtype == INT64 || dtype == DOUBLE) ? 8 : 4; - if (dtype == INT32) { - dtype_len_in = GetDtypeLogicalLen(s->col.leaf_column); - } else if (dtype == INT96) { - dtype_len_in = 8; - } else { - dtype_len_in = dtype_len; - } - __syncthreads(); - while (s->row_cnt < s->ck.num_rows) { - uint32_t frag_start_row = s->ck.start_row + s->row_cnt, num_dict_entries, frag_dict_size; - FetchDictionaryFragment(s, s->col.dict_data, frag_start_row, t); - __syncthreads(); - num_dict_entries = s->frag.num_dict_vals; - if (!t) { - s->num_dict_entries = 0; - s->frag_dict_size = 0; - } - for (uint32_t i = 0; i < num_dict_entries; i += 1024) { - bool is_valid = (i + t < num_dict_entries); - uint32_t len = 0; - uint32_t is_dupe = 0; - uint32_t row, hash, next, *next_addr; - uint32_t new_dict_entries; - - if (is_valid) { - row = frag_start_row + s->frag_dict[i + t]; - len = dtype_len; - if (dtype == BYTE_ARRAY) { - auto str1 = s->col.leaf_column->element(row); - len += str1.size_bytes(); - hash = hash_string(str1); - // Walk the list of rows with the same hash - next_addr = &s->hashmap[hash]; - while ((next = atomicCAS(next_addr, 0, row + 1)) != 0) { - auto const current = next - 1; - auto str2 = s->col.leaf_column->element(current); - if (str1 == str2) { - is_dupe = 1; - break; - } - next_addr = &s->col.dict_data[next - 1]; - } - } else { - uint64_t val; - - if (dtype_len_in == 8) { - val = s->col.leaf_column->element(row); - hash = uint64_hash16(val); - } else { - val = (dtype_len_in == 4) ? s->col.leaf_column->element(row) - : (dtype_len_in == 2) ? s->col.leaf_column->element(row) - : s->col.leaf_column->element(row); - hash = uint32_hash16(val); - } - // Walk the list of rows with the same hash - next_addr = &s->hashmap[hash]; - while ((next = atomicCAS(next_addr, 0, row + 1)) != 0) { - auto const current = next - 1; - uint64_t val2 = (dtype_len_in == 8) ? s->col.leaf_column->element(current) - : (dtype_len_in == 4) ? s->col.leaf_column->element(current) - : (dtype_len_in == 2) ? s->col.leaf_column->element(current) - : s->col.leaf_column->element(current); - if (val2 == val) { - is_dupe = 1; - break; - } - next_addr = &s->col.dict_data[next - 1]; - } - } - } - // Count the non-duplicate entries - frag_dict_size = block_reduce(temp_storage).Sum((is_valid && !is_dupe) ? len : 0); - new_dict_entries = __syncthreads_count(is_valid && !is_dupe); - if (t == 0) { - s->frag_dict_size += frag_dict_size; - s->num_dict_entries += new_dict_entries; - } - if (is_valid) { - if (!is_dupe) { - s->col.dict_index[row] = row; - } else { - s->col.dict_index[row] = (next - 1) | (1u << 31); - } - } - __syncthreads(); - // At this point, the dictionary order is non-deterministic, and we want insertion order - // Make sure that the non-duplicate entry corresponds to the lower row number - // (The entry in dict_data (next-1) used for duplicate elimination does not need - // to be the lowest row number) - bool reorder_check = (is_valid && is_dupe && next - 1 > row); - if (reorder_check) { - next = s->col.dict_index[next - 1]; - while (next & (1u << 31)) { - next = s->col.dict_index[next & 0x7fffffff]; - } - } - if (__syncthreads_or(reorder_check)) { - if (reorder_check) { atomicMin(&s->col.dict_index[next], row); } - __syncthreads(); - if (reorder_check && s->col.dict_index[next] == row) { - s->col.dict_index[next] = row | (1u << 31); - s->col.dict_index[row] = row; - } - __syncthreads(); - } - } - __syncthreads(); - num_dict_entries = s->num_dict_entries; - frag_dict_size = s->frag_dict_size; - if (s->total_dict_entries + num_dict_entries > 65536 || - (s->dictionary_size != 0 && s->dictionary_size + frag_dict_size > 512 * 1024)) { - break; - } - __syncthreads(); - if (!t) { - if (num_dict_entries != s->frag.num_dict_vals) { - s->cur_fragment->num_dict_vals = num_dict_entries; - } - if (frag_dict_size != s->frag.dict_data_size) { s->frag.dict_data_size = frag_dict_size; } - s->total_dict_entries += num_dict_entries; - s->dictionary_size += frag_dict_size; - s->row_cnt += s->frag.num_rows; - s->cur_fragment++; - s->ck.num_dict_fragments++; - } - __syncthreads(); - } - __syncthreads(); - GenerateDictionaryIndices(s, t); - if (!t) { - chunks[blockIdx.x].num_dict_fragments = s->ck.num_dict_fragments; - chunks[blockIdx.x].dictionary_size = s->dictionary_size; - chunks[blockIdx.x].total_dict_entries = s->total_dict_entries; - } -} - -/** - * @brief Launches kernel for building chunk dictionaries - * - * @param[in,out] chunks Column chunks - * @param[in] dev_scratch Device scratch data (kDictScratchSize per dictionary) - * @param[in] stream CUDA stream to use, default 0 - */ -void BuildChunkDictionaries(device_span chunks, - uint32_t* dev_scratch, - rmm::cuda_stream_view stream) -{ - auto num_chunks = chunks.size(); - gpuBuildChunkDictionaries<1024><<>>(chunks, dev_scratch); -} - -} // namespace gpu -} // namespace parquet -} // namespace io -} // namespace cudf diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 3c62dcf7eea..e8ca6c02a8e 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -48,14 +48,7 @@ constexpr uint32_t rle_buffer_size = (1 << 9); struct frag_init_state_s { parquet_column_device_view col; PageFragment frag; - uint32_t total_dupes; size_type start_value_idx; - volatile uint32_t scratch_red[32]; - uint32_t dict[max_page_fragment_size]; - union { - uint16_t u16[1 << (init_hash_bits)]; - uint32_t u32[1 << (init_hash_bits - 1)]; - } map; }; struct page_enc_state_s { @@ -68,6 +61,7 @@ struct page_enc_state_s { uint32_t rle_lit_count; uint32_t rle_rpt_count; uint32_t page_start_val; + uint32_t chunk_start_val; volatile uint32_t rpt_map[4]; volatile uint32_t scratch_red[32]; EncPage page; @@ -124,31 +118,22 @@ __global__ void __launch_bounds__(block_size) __shared__ __align__(16) frag_init_state_s state_g; using block_reduce = cub::BlockReduce; - using block_scan = cub::BlockScan; - __shared__ union { - typename block_reduce::TempStorage reduce_storage; - typename block_scan::TempStorage scan_storage; - } temp_storage; + __shared__ typename block_reduce::TempStorage reduce_storage; frag_init_state_s* const s = &state_g; uint32_t t = threadIdx.x; - uint32_t start_row, dtype_len, dtype_len_in, dtype; + uint32_t start_row, dtype_len, dtype; if (t == 0) s->col = col_desc[blockIdx.x]; - for (uint32_t i = 0; i < sizeof(s->map) / sizeof(uint32_t); i += block_size) { - if (i + t < sizeof(s->map) / sizeof(uint32_t)) s->map.u32[i + t] = 0; - } __syncthreads(); start_row = blockIdx.y * fragment_size; if (!t) { // frag.num_rows = fragment_size except for the last page fragment which can be smaller. // num_rows is fixed but fragment size could be larger if the data is strings or nested. s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows)); - s->frag.non_nulls = 0; s->frag.num_dict_vals = 0; s->frag.fragment_data_size = 0; s->frag.dict_data_size = 0; - s->total_dupes = 0; // To use num_vals instead of num_rows, we need to calculate num_vals on the fly. // For list>, values between i and i+50 can be calculated by @@ -195,16 +180,6 @@ __global__ void __launch_bounds__(block_size) : (dtype == INT64 || dtype == DOUBLE) ? 8 : (dtype == BOOLEAN) ? 1 : 4; - if (dtype == INT32) { - dtype_len_in = GetDtypeLogicalLen(s->col.leaf_column); - } else if (dtype == INT96) { - // cudf doesn't support INT96 internally and uses INT64, so treat INT96 as an INT64 for - // computing dictionary hash values and reading the data, but we do treat it as 12 bytes for - // dtype_len, which determines how much memory we need to allocate for the fragment. - dtype_len_in = 8; - } else { - dtype_len_in = dtype_len; - } __syncthreads(); size_type nvals = s->frag.num_leaf_values; @@ -215,167 +190,22 @@ __global__ void __launch_bounds__(block_size) uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size()) ? s->col.leaf_column->is_valid(val_idx) : 0; - uint32_t len, nz_pos, hash; + uint32_t len; if (is_valid) { len = dtype_len; if (dtype != BOOLEAN) { if (dtype == BYTE_ARRAY) { auto str = s->col.leaf_column->element(val_idx); len += str.size_bytes(); - hash = hash_string(str); - } else if (dtype_len_in == 8) { - hash = uint64_init_hash(s->col.leaf_column->element(val_idx)); - } else { - hash = - uint32_init_hash((dtype_len_in == 4) ? s->col.leaf_column->element(val_idx) - : (dtype_len_in == 2) ? s->col.leaf_column->element(val_idx) - : s->col.leaf_column->element(val_idx)); } } } else { len = 0; } - uint32_t non_nulls; - block_scan(temp_storage.scan_storage).ExclusiveSum(is_valid, nz_pos, non_nulls); - nz_pos += s->frag.non_nulls; - __syncthreads(); - len = block_reduce(temp_storage.reduce_storage).Sum(len); - if (!t) { - s->frag.non_nulls += non_nulls; - s->frag.fragment_data_size += len; - } - __syncthreads(); - if (is_valid && dtype != BOOLEAN) { - uint32_t* dict_index = s->col.dict_index; - if (dict_index) { - atomicAdd(&s->map.u32[hash >> 1], (hash & 1) ? 1 << 16 : 1); - dict_index[start_value_idx + nz_pos] = - ((i + t) << init_hash_bits) | - hash; // Store the hash along with the index, so we don't have to recompute it - } - } - __syncthreads(); - } - __syncthreads(); - // Reorder the 16-bit local indices according to the hash values - if (s->col.dict_index) { - static_assert((init_hash_bits == 12), "Hardcoded for init_hash_bits=12"); - // Cumulative sum of hash map counts - uint32_t count01 = s->map.u32[t * 4 + 0]; - uint32_t count23 = s->map.u32[t * 4 + 1]; - uint32_t count45 = s->map.u32[t * 4 + 2]; - uint32_t count67 = s->map.u32[t * 4 + 3]; - uint32_t sum01 = count01 + (count01 << 16); - uint32_t sum23 = count23 + (count23 << 16); - uint32_t sum45 = count45 + (count45 << 16); - uint32_t sum67 = count67 + (count67 << 16); - sum23 += (sum01 >> 16) * 0x10001; - sum45 += (sum23 >> 16) * 0x10001; - sum67 += (sum45 >> 16) * 0x10001; - uint32_t sum_w = sum67 >> 16; - block_scan(temp_storage.scan_storage).InclusiveSum(sum_w, sum_w); - sum_w = (sum_w - (sum67 >> 16)) * 0x10001; - s->map.u32[t * 4 + 0] = sum_w + sum01 - count01; - s->map.u32[t * 4 + 1] = sum_w + sum23 - count23; - s->map.u32[t * 4 + 2] = sum_w + sum45 - count45; - s->map.u32[t * 4 + 3] = sum_w + sum67 - count67; - } - __syncthreads(); - // Put the indices back in hash order - if (s->col.dict_index) { - uint32_t* dict_index = s->col.dict_index + start_row; - uint32_t nnz = s->frag.non_nulls; - for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t pos = 0, hash = 0, pos_old, pos_new, sh, colliding_row, val = 0; - bool collision; - if (i + t < nnz) { - val = dict_index[i + t]; - hash = val & ((1 << init_hash_bits) - 1); - sh = (hash & 1) ? 16 : 0; - pos_old = s->map.u16[hash]; - } - // The isolation of the atomicAdd, along with pos_old/pos_new is to guarantee deterministic - // behavior for the first row in the hash map that will be used for early duplicate detection - __syncthreads(); - if (i + t < nnz) { - pos = (atomicAdd(&s->map.u32[hash >> 1], 1 << sh) >> sh) & 0xffff; - s->dict[pos] = val; - } - __syncthreads(); - collision = false; - if (i + t < nnz) { - pos_new = s->map.u16[hash]; - collision = (pos != pos_old && pos_new > pos_old + 1); - if (collision) { colliding_row = s->dict[pos_old]; } - } - __syncthreads(); - if (collision) { atomicMin(&s->dict[pos_old], val); } - __syncthreads(); - // Resolve collision - if (collision && val == s->dict[pos_old]) { s->dict[pos] = colliding_row; } - } + len = block_reduce(reduce_storage).Sum(len); + if (!t) { s->frag.fragment_data_size += len; } __syncthreads(); - // Now that the values are ordered by hash, compare every entry with the first entry in the hash - // map, the position of the first entry can be inferred from the hash map counts - uint32_t dupe_data_size = 0; - for (uint32_t i = 0; i < nnz; i += block_size) { - uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0; - if (i + t < nnz) { - uint32_t dict_val = s->dict[i + t]; - uint32_t hash = dict_val & ((1 << init_hash_bits) - 1); - ck_row = start_row + (dict_val >> init_hash_bits); - ck_row_ref = start_row + (s->dict[(hash > 0) ? s->map.u16[hash - 1] : 0] >> init_hash_bits); - if (ck_row_ref != ck_row) { - if (dtype == BYTE_ARRAY) { - auto str1 = s->col.leaf_column->element(ck_row); - auto str2 = s->col.leaf_column->element(ck_row_ref); - is_dupe = (str1 == str2); - dupe_data_size += (is_dupe) ? 4 + str1.size_bytes() : 0; - } else { - if (dtype_len_in == 8) { - auto v1 = s->col.leaf_column->element(ck_row); - auto v2 = s->col.leaf_column->element(ck_row_ref); - is_dupe = (v1 == v2); - dupe_data_size += (is_dupe) ? 8 : 0; - } else { - uint32_t v1, v2; - if (dtype_len_in == 4) { - v1 = s->col.leaf_column->element(ck_row); - v2 = s->col.leaf_column->element(ck_row_ref); - } else if (dtype_len_in == 2) { - v1 = s->col.leaf_column->element(ck_row); - v2 = s->col.leaf_column->element(ck_row_ref); - } else { - v1 = s->col.leaf_column->element(ck_row); - v2 = s->col.leaf_column->element(ck_row_ref); - } - is_dupe = (v1 == v2); - dupe_data_size += (is_dupe) ? 4 : 0; - } - } - } - } - uint32_t dupes_in_block; - uint32_t dupes_before; - block_scan(temp_storage.scan_storage).InclusiveSum(is_dupe, dupes_before, dupes_in_block); - dupes_before += s->total_dupes; - __syncthreads(); - if (t == 0) { s->total_dupes += dupes_in_block; } - if (i + t < nnz) { - if (!is_dupe) { - s->col.dict_data[start_row + i + t - dupes_before] = ck_row; - } else { - s->col.dict_index[ck_row] = ck_row_ref | (1u << 31); - } - } - } - __syncthreads(); - dupe_data_size = block_reduce(temp_storage.reduce_storage).Sum(dupe_data_size); - if (!t) { - s->frag.dict_data_size = s->frag.fragment_data_size - dupe_data_size; - s->frag.num_dict_vals = s->frag.non_nulls - s->total_dupes; - } } __syncthreads(); if (t == 0) frag[blockIdx.x][blockIdx.y] = s->frag; @@ -449,22 +279,21 @@ __global__ void __launch_bounds__(128) pagestats_g.start_chunk = ck_g.first_fragment; pagestats_g.num_chunks = 0; } - if (ck_g.has_dictionary) { + if (ck_g.use_dictionary) { if (!t) { page_g.page_data = ck_g.uncompressed_bfr + page_offset; page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset; page_g.num_fragments = 0; page_g.page_type = PageType::DICTIONARY_PAGE; - page_g.dict_bits_plus1 = 0; page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; page_g.hdr_size = 0; page_g.max_hdr_size = 32; - page_g.max_data_size = ck_g.dictionary_size; + page_g.max_data_size = ck_g.uniq_data_size; page_g.start_row = cur_row; - page_g.num_rows = ck_g.total_dict_entries; - page_g.num_leaf_values = ck_g.total_dict_entries; - page_g.num_values = ck_g.total_dict_entries; + page_g.num_rows = ck_g.num_dict_entries; + page_g.num_leaf_values = ck_g.num_dict_entries; + page_g.num_values = ck_g.num_dict_entries; // TODO: shouldn't matter for dict page page_offset += page_g.max_hdr_size + page_g.max_data_size; comp_page_offset += page_g.max_hdr_size + GetMaxCompressedBfrSize(page_g.max_data_size); } @@ -483,7 +312,7 @@ __global__ void __launch_bounds__(128) // This doesn't actually deal with data. It's agnostic. It only cares about number of rows and // page size. do { - uint32_t fragment_data_size, max_page_size, minmax_len = 0; + uint32_t minmax_len = 0; __syncwarp(); if (num_rows < ck_g.num_rows) { if (t == 0) { frag_g = ck_g.fragments[fragments_in_chunk]; } @@ -496,50 +325,27 @@ __global__ void __launch_bounds__(128) frag_g.num_rows = 0; } __syncwarp(); - if (ck_g.has_dictionary && fragments_in_chunk < ck_g.num_dict_fragments) { - fragment_data_size = - frag_g.num_leaf_values * 2; // Assume worst-case of 2-bytes per dictionary index - } else { - fragment_data_size = frag_g.fragment_data_size; - } + uint32_t fragment_data_size = + (ck_g.use_dictionary) + ? frag_g.num_leaf_values * 2 // Assume worst-case of 2-bytes per dictionary index + : frag_g.fragment_data_size; // TODO (dm): this convoluted logic to limit page size needs refactoring - max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024 - : (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024 - : 512 * 1024; + uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024 + : (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024 + : 512 * 1024; if (num_rows >= ck_g.num_rows || - (values_in_page > 0 && - (page_size + fragment_data_size > max_page_size || - (ck_g.has_dictionary && fragments_in_chunk == ck_g.num_dict_fragments)))) { - uint32_t dict_bits_plus1; - - if (ck_g.has_dictionary && page_start < ck_g.num_dict_fragments) { - uint32_t dict_bits; - if (num_dict_entries <= 2) { - dict_bits = 1; - } else if (num_dict_entries <= 4) { - dict_bits = 2; - } else if (num_dict_entries <= 16) { - dict_bits = 4; - } else if (num_dict_entries <= 256) { - dict_bits = 8; - } else if (num_dict_entries <= 4096) { - dict_bits = 12; - } else { - dict_bits = 16; - } - page_size = 1 + 5 + ((values_in_page * dict_bits + 7) >> 3) + (values_in_page >> 8); - dict_bits_plus1 = dict_bits + 1; - } else { - dict_bits_plus1 = 0; + (values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) { + if (ck_g.use_dictionary) { + page_size = + 1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8); } if (!t) { - page_g.num_fragments = fragments_in_chunk - page_start; - page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; - page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; - page_g.page_type = PageType::DATA_PAGE; - page_g.dict_bits_plus1 = dict_bits_plus1; - page_g.hdr_size = 0; - page_g.max_hdr_size = 32; // Max size excluding statistics + page_g.num_fragments = fragments_in_chunk - page_start; + page_g.chunk = &chunks[blockIdx.y][blockIdx.x]; + page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; + page_g.page_type = PageType::DATA_PAGE; + page_g.hdr_size = 0; + page_g.max_hdr_size = 32; // Max size excluding statistics if (ck_g.stats) { uint32_t stats_hdr_len = 16; if (col_g.stats_dtype == dtype_string) { @@ -611,8 +417,8 @@ __global__ void __launch_bounds__(128) ck_g.num_pages = num_pages; ck_g.bfr_size = page_offset; ck_g.compressed_size = comp_page_offset; - pagestats_g.start_chunk = ck_g.first_page + ck_g.has_dictionary; // Exclude dictionary - pagestats_g.num_chunks = num_pages - ck_g.has_dictionary; + pagestats_g.start_chunk = ck_g.first_page + ck_g.use_dictionary; // Exclude dictionary + pagestats_g.num_chunks = num_pages - ck_g.use_dictionary; } } __syncthreads(); @@ -1069,7 +875,10 @@ __global__ void __launch_bounds__(128, 8) } else { dtype_len_in = dtype_len_out; } - dict_bits = (dtype == BOOLEAN) ? 1 : (s->page.dict_bits_plus1 - 1); + dict_bits = (dtype == BOOLEAN) ? 1 + : (s->ck.use_dictionary and s->page.page_type != PageType::DICTIONARY_PAGE) + ? s->ck.dict_rle_bits + : -1; if (t == 0) { uint8_t* dst = s->cur; s->rle_run = 0; @@ -1080,37 +889,56 @@ __global__ void __launch_bounds__(128, 8) dst[0] = dict_bits; s->rle_out = dst + 1; } - s->page_start_val = s->page.start_row; - if (s->col.parent_column != nullptr) { + s->page_start_val = s->page.start_row; // Dictionary page's start row is chunk's start row + auto chunk_start_val = s->ck.start_row; + if (s->col.parent_column != nullptr) { // TODO: remove this check. parent is now never nullptr auto col = *(s->col.parent_column); auto current_page_start_val = s->page_start_val; + // TODO: We do this so much. Add a global function that converts row idx to val idx while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) { if (col.type().id() == type_id::STRUCT) { current_page_start_val += col.offset(); + chunk_start_val += col.offset(); col = col.child(0); } else { - current_page_start_val = col.child(lists_column_view::offsets_column_index) - .element(current_page_start_val + col.offset()); - col = col.child(lists_column_view::child_column_index); + auto offset_col = col.child(lists_column_view::offsets_column_index); + current_page_start_val = + offset_col.element(current_page_start_val + col.offset()); + chunk_start_val = offset_col.element(chunk_start_val + col.offset()); + col = col.child(lists_column_view::child_column_index); } } - s->page_start_val = current_page_start_val; + s->page_start_val = current_page_start_val; + s->chunk_start_val = chunk_start_val; } } __syncthreads(); for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { - uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128); - uint32_t val_idx = s->page_start_val + cur_val_idx + t; - uint32_t is_valid, len, pos; + uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128); + uint32_t len, pos; + + auto [is_valid, val_idx] = [&]() { + uint32_t val_idx; + uint32_t is_valid; + + size_type val_idx_in_block = cur_val_idx + t; + if (s->page.page_type == PageType::DICTIONARY_PAGE) { + val_idx = val_idx_in_block; + is_valid = (val_idx < s->page.num_leaf_values); + if (is_valid) { val_idx = s->ck.dict_data[val_idx]; } + } else { + size_type val_idx_in_leaf_col = s->page_start_val + val_idx_in_block; + + is_valid = (val_idx_in_leaf_col < s->col.leaf_column->size() && + val_idx_in_block < s->page.num_leaf_values) + ? s->col.leaf_column->is_valid(val_idx_in_leaf_col) + : 0; + val_idx = + (s->ck.use_dictionary) ? val_idx_in_leaf_col - s->chunk_start_val : val_idx_in_leaf_col; + } + return std::make_tuple(is_valid, val_idx); + }(); - if (s->page.page_type == PageType::DICTIONARY_PAGE) { - is_valid = (cur_val_idx + t < s->page.num_leaf_values); - val_idx = (is_valid) ? s->col.dict_data[val_idx] : val_idx; - } else { - is_valid = (val_idx < s->col.leaf_column->size() && cur_val_idx + t < s->page.num_leaf_values) - ? s->col.leaf_column->is_valid(val_idx) - : 0; - } cur_val_idx += nvals; if (dict_bits >= 0) { // Dictionary encoding @@ -1124,7 +952,7 @@ __global__ void __launch_bounds__(128, 8) if (dtype == BOOLEAN) { v = s->col.leaf_column->element(val_idx); } else { - v = s->col.dict_index[val_idx]; + v = s->ck.dict_index[val_idx]; } s->vals[(rle_numvals + pos) & (rle_buffer_size - 1)] = v; } @@ -1531,13 +1359,12 @@ __global__ void __launch_bounds__(128) // data pages (actual encoding is identical). Encoding encoding; if (enable_bool_rle) { - encoding = (col_g.physical_type != BOOLEAN) - ? (page_type == PageType::DICTIONARY_PAGE || page_g.dict_bits_plus1 != 0) - ? Encoding::PLAIN_DICTIONARY - : Encoding::PLAIN - : Encoding::RLE; + encoding = (col_g.physical_type == BOOLEAN) ? Encoding::RLE + : (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary) + ? Encoding::PLAIN_DICTIONARY + : Encoding::PLAIN; } else { - encoding = (page_type == PageType::DICTIONARY_PAGE || page_g.dict_bits_plus1 != 0) + encoding = (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary) ? Encoding::PLAIN_DICTIONARY : Encoding::PLAIN; } @@ -1562,7 +1389,7 @@ __global__ void __launch_bounds__(128) } else { // DictionaryPageHeader encoder.field_struct_begin(7); - encoder.field_int32(1, ck_g.total_dict_entries); // number of values in dictionary + encoder.field_int32(1, ck_g.num_dict_entries); // number of values in dictionary encoder.field_int32(2, encoding); encoder.field_struct_end(7); } @@ -1613,12 +1440,12 @@ __global__ void __launch_bounds__(1024) memcpy_block<1024, true>(dst, src, data_len, t); dst += data_len; __syncthreads(); - if (!t && page == 0 && ck_g.has_dictionary) { ck_g.dictionary_size = hdr_len + data_len; } + if (!t && page == 0 && ck_g.use_dictionary) { ck_g.dictionary_size = hdr_len + data_len; } } if (t == 0) { chunks[blockIdx.x].bfr_size = uncompressed_size; chunks[blockIdx.x].compressed_size = (dst - dst_base); - if (ck_g.has_dictionary) { chunks[blockIdx.x].dictionary_size = ck_g.dictionary_size; } + if (ck_g.use_dictionary) { chunks[blockIdx.x].dictionary_size = ck_g.dictionary_size; } } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 975d2545cd1..cdd7c6b6674 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -28,6 +28,8 @@ #include #include +#include + #include #include #include @@ -42,6 +44,10 @@ namespace parquet { using cudf::io::detail::string_index_pair; +// Total number of unsigned 16 bit values +constexpr size_type MAX_DICT_SIZE = + std::numeric_limits::max() - std::numeric_limits::min() + 1; + /** * @brief Struct representing an input column in the file. */ @@ -56,6 +62,11 @@ struct input_column_info { namespace gpu { +auto constexpr KEY_SENTINEL = size_type{-1}; +auto constexpr VALUE_SENTINEL = size_type{-1}; +using map_type = cuco::static_map; +using slot_type = map_type::pair_atomic_type; + /** * @brief Enums for the flags in the page header */ @@ -222,8 +233,6 @@ struct ColumnChunkDesc { * @brief Struct describing an encoder column */ struct parquet_column_device_view : stats_column_desc { - uint32_t* dict_index; //!< Dictionary index [row] - uint32_t* dict_data; //!< Dictionary data (unique row indices) uint8_t physical_type; //!< physical data type uint8_t converted_type; //!< logical data type uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) @@ -236,9 +245,9 @@ struct parquet_column_device_view : stats_column_desc { size_type const* level_offsets; //!< Offset array for per-row pre-calculated rep/def level values uint8_t const* rep_values; //!< Pre-calculated repetition level values uint8_t const* def_values; //!< Pre-calculated definition level values - uint8_t* nullability; //!< Array of nullability of each nesting level. e.g. nullable[0] is - //!< nullability of parent_column. May be different from col.nullable() in - //!< case of chunked writing. + uint8_t const* nullability; //!< Array of nullability of each nesting level. e.g. nullable[0] is + //!< nullability of parent_column. May be different from + //!< col.nullable() in case of chunked writing. }; constexpr int max_page_fragment_size = 5000; //!< Max number of rows in a page fragment @@ -253,7 +262,6 @@ struct PageFragment { uint32_t start_value_idx; uint32_t num_leaf_values; //!< Number of leaf values in fragment. Does not include nulls at //!< non-leaf level - uint32_t non_nulls; //!< Number of non-null values uint16_t num_rows; //!< Number of rows in fragment uint16_t num_dict_vals; //!< Number of unique dictionary entries }; @@ -292,26 +300,33 @@ struct EncPage; */ struct EncColumnChunk { parquet_column_device_view const* col_desc; //!< Column description - PageFragment* fragments; //!< First fragment in chunk - uint8_t* uncompressed_bfr; //!< Uncompressed page data - uint8_t* compressed_bfr; //!< Compressed page data - statistics_chunk const* stats; //!< Fragment statistics - uint32_t bfr_size; //!< Uncompressed buffer size - uint32_t compressed_size; //!< Compressed buffer size - uint32_t start_row; //!< First row of chunk - uint32_t num_rows; //!< Number of rows in chunk - uint32_t num_values; //!< Number of values in chunk. Different from num_rows for nested types + size_type col_desc_id; + PageFragment* fragments; //!< First fragment in chunk + uint8_t* uncompressed_bfr; //!< Uncompressed page data + uint8_t* compressed_bfr; //!< Compressed page data + statistics_chunk const* stats; //!< Fragment statistics + uint32_t bfr_size; //!< Uncompressed buffer size + uint32_t compressed_size; //!< Compressed buffer size + uint32_t start_row; //!< First row of chunk + uint32_t num_rows; //!< Number of rows in chunk + size_type num_values; //!< Number of values in chunk. Different from num_rows for nested types uint32_t first_fragment; //!< First fragment of chunk EncPage* pages; //!< Ptr to pages that belong to this chunk uint32_t first_page; //!< First page of chunk uint32_t num_pages; //!< Number of pages in chunk - uint32_t dictionary_id; //!< Dictionary id for this chunk uint8_t is_compressed; //!< Nonzero if the chunk uses compression - uint8_t has_dictionary; //!< Nonzero if the chunk uses dictionary encoding - uint16_t num_dict_fragments; //!< Number of fragments using dictionary - uint32_t dictionary_size; //!< Size of dictionary - uint32_t total_dict_entries; //!< Total number of entries in dictionary - uint32_t ck_stat_size; //!< Size of chunk-level statistics (included in 1st page header) + uint32_t dictionary_size; //!< Size of dictionary page including header + uint32_t ck_stat_size; //!< Size of chunk-level statistics (included in 1st page header) + slot_type* dict_map_slots; //!< Hash map storage for calculating dict encoding for this chunk + size_type dict_map_size; //!< Size of dict_map_slots + size_type num_dict_entries; //!< Total number of entries in dictionary + size_type + uniq_data_size; //!< Size of dictionary page (set of all unique values) if dict enc is used + size_type plain_data_size; //!< Size of data in this chunk if plain encoding is used + size_type* dict_data; //!< Dictionary data (unique row indices) + uint16_t* dict_index; //!< Index of value in dictionary page. column[dict_data[dict_index[row]]] + uint8_t dict_rle_bits; //!< Bit size for encoding dictionary indices + bool use_dictionary; //!< True if the chunk uses dictionary encoding }; /** @@ -322,7 +337,6 @@ struct EncPage { uint8_t* compressed_data; //!< Ptr to compressed page uint16_t num_fragments; //!< Number of fragments in page PageType page_type; //!< Page type - uint8_t dict_bits_plus1; //!< 0=plain, nonzero:bits to encoding dictionary indices + 1 EncColumnChunk* chunk; //!< Chunk that this page belongs to uint32_t chunk_id; //!< Index in chunk array uint32_t hdr_size; //!< Size of page header @@ -449,7 +463,7 @@ dremel_data get_dremel_data(column_view h_col, * @param[in] num_columns Number of columns * @param[in] fragment_size Number of rows per fragment * @param[in] num_rows Number of rows per column - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void InitPageFragments(cudf::detail::device_2dspan frag, device_span col_desc, @@ -463,13 +477,57 @@ void InitPageFragments(cudf::detail::device_2dspan frag, * @param[out] groups Statistics groups [num_columns x num_fragments] * @param[in] fragments Page fragments [num_columns x num_fragments] * @param[in] col_desc Column description [num_columns] - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void InitFragmentStatistics(cudf::detail::device_2dspan groups, cudf::detail::device_2dspan fragments, device_span col_desc, rmm::cuda_stream_view stream); +/** + * @brief Initialize per-chunk hash maps used for dictionary with sentinel values + * + * @param chunks Flat span of chunks to intialize hash maps for + * @param stream CUDA stream to use + */ +void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_stream_view stream); + +/** + * @brief Insert chunk values into their respective hash maps + * + * @param chunks Column chunks [rowgroup][column] + * @param num_rows Number of rows per column + * @param stream CUDA stream to use + */ +void populate_chunk_hash_maps(cudf::detail::device_2dspan chunks, + size_type num_rows, + rmm::cuda_stream_view stream); + +/** + * @brief Compact dictionary hash map entries into chunk.dict_data + * + * @param chunks Flat span of chunks to compact hash maps for + * @param stream CUDA stream to use + */ +void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream); + +/** + * @brief Get the Dictionary Indices for each row + * + * For each row of a chunk, gets the indices into chunk.dict_data which contains the value otherwise + * stored in input column [row]. Stores these indices into chunk.dict_index. + * + * Since dict_data itself contains indices into the original cudf column, this means that + * col[row] == col[dict_data[dict_index[row - chunk.start_row]]] + * + * @param chunks Column chunks [rowgroup][column] + * @param num_rows Number of rows per column + * @param stream CUDA stream to use + */ +void get_dictionary_indices(cudf::detail::device_2dspan chunks, + size_type num_rows, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for initializing encoder data pages * @@ -538,17 +596,6 @@ void GatherPages(device_span chunks, device_span pages, rmm::cuda_stream_view stream); -/** - * @brief Launches kernel for building chunk dictionaries - * - * @param[in] chunks Column chunks - * @param[in] dev_scratch Device scratch data (kDictScratchSize bytes per dictionary) - * @param[in] stream CUDA stream to use, default 0 - */ -void BuildChunkDictionaries(device_span chunks, - uint32_t* dev_scratch, - rmm::cuda_stream_view stream); - } // namespace gpu } // namespace parquet } // namespace io diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 7c0ce03886d..780aa03506e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -505,7 +505,7 @@ struct parquet_column_view { rmm::cuda_stream_view stream); column_view leaf_column_view() const; - gpu::parquet_column_device_view get_device_view(rmm::cuda_stream_view stream); + gpu::parquet_column_device_view get_device_view(rmm::cuda_stream_view stream) const; column_view cudf_column_view() const { return cudf_col; } parquet::Type physical_type() const { return schema_node.type; } @@ -517,26 +517,6 @@ struct parquet_column_view { uint8_t max_rep_level() const noexcept { return _max_rep_level; } bool is_list() const noexcept { return _is_list; } - // Dictionary related member functions - uint32_t* get_dict_data() { return (_dict_data.size()) ? _dict_data.data() : nullptr; } - uint32_t* get_dict_index() { return (_dict_index.size()) ? _dict_index.data() : nullptr; } - void use_dictionary(bool use_dict) { _dictionary_used = use_dict; } - void alloc_dictionary(size_t max_num_rows, rmm::cuda_stream_view stream) - { - _dict_data.resize(max_num_rows, stream); - _dict_index.resize(max_num_rows, stream); - } - bool check_dictionary_used(rmm::cuda_stream_view stream) - { - if (!_dictionary_used) { - _dict_data.resize(0, stream); - _dict_data.shrink_to_fit(stream); - _dict_index.resize(0, stream); - _dict_index.shrink_to_fit(stream); - } - return _dictionary_used; - } - private: // Schema related members schema_tree_node schema_node; @@ -556,11 +536,6 @@ struct parquet_column_view { rmm::device_uvector _def_level; std::vector _nullability; size_type _data_count = 0; - - // Dictionary related members - bool _dictionary_used = false; - rmm::device_uvector _dict_data; - rmm::device_uvector _dict_index; }; parquet_column_view::parquet_column_view(schema_tree_node const& schema_node, @@ -570,9 +545,7 @@ parquet_column_view::parquet_column_view(schema_tree_node const& schema_node, _d_nullability(0, stream), _dremel_offsets(0, stream), _rep_level(0, stream), - _def_level(0, stream), - _dict_data(0, stream), - _dict_index(0, stream) + _def_level(0, stream) { // Construct single inheritance column_view from linked_column_view auto curr_col = schema_node.leaf_column.get(); @@ -683,21 +656,14 @@ column_view parquet_column_view::leaf_column_view() const return col; } -gpu::parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream_view stream) +gpu::parquet_column_device_view parquet_column_view::get_device_view( + rmm::cuda_stream_view stream) const { column_view col = leaf_column_view(); auto desc = gpu::parquet_column_device_view{}; // Zero out all fields desc.stats_dtype = schema_node.stats_dtype; desc.ts_scale = schema_node.ts_scale; - // TODO (dm): Enable dictionary for list and struct after refactor - if (physical_type() != BOOLEAN && physical_type() != UNDEFINED_TYPE && - !is_nested(cudf_col.type())) { - alloc_dictionary(_data_count, stream); - desc.dict_index = get_dict_index(); - desc.dict_data = get_dict_data(); - } - if (is_list()) { desc.level_offsets = _dremel_offsets.data(); desc.rep_values = _rep_level.data(); @@ -705,15 +671,9 @@ gpu::parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_s } desc.num_rows = cudf_col.size(); desc.physical_type = static_cast(physical_type()); - auto count_bits = [](uint16_t number) { - int16_t nbits = 0; - while (number > 0) { - nbits++; - number >>= 1; - } - return nbits; - }; - desc.level_bits = count_bits(max_rep_level()) << 4 | count_bits(max_def_level()); + + desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 | + CompactProtocolReader::NumRequiredBits(max_def_level()); desc.nullability = _d_nullability.data(); return desc; } @@ -744,22 +704,99 @@ void writer::impl::gather_fragment_statistics( stream.synchronize(); } -void writer::impl::build_chunk_dictionaries( - hostdevice_2dvector& chunks, - device_span col_desc, - uint32_t num_columns, - uint32_t num_dictionaries) +void writer::impl::init_page_sizes(hostdevice_2dvector& chunks, + device_span col_desc, + uint32_t num_columns) { chunks.host_to_device(stream); - if (num_dictionaries > 0) { - size_t dict_scratch_size = (size_t)num_dictionaries * gpu::kDictScratchSize; - auto dict_scratch = cudf::detail::make_zeroed_device_uvector_async( - dict_scratch_size / sizeof(uint32_t), stream); + gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, stream); + chunks.device_to_host(stream, true); +} + +auto build_chunk_dictionaries(hostdevice_2dvector& chunks, + host_span col_desc, + uint32_t num_rows, + rmm::cuda_stream_view stream) +{ + // At this point, we know all chunks and their sizes. We want to allocate dictionaries for each + // chunk that can have dictionary + + auto h_chunks = chunks.host_view().flat_view(); + + std::vector> dict_data; + std::vector> dict_index; + + if (h_chunks.size() == 0) { return std::make_pair(std::move(dict_data), std::move(dict_index)); } - gpu::BuildChunkDictionaries(chunks.device_view().flat_view(), dict_scratch.data(), stream); + // Allocate slots for each chunk + std::vector> hash_maps_storage; + hash_maps_storage.reserve(h_chunks.size()); + for (auto& chunk : h_chunks) { + if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN) { + chunk.use_dictionary = false; + } else { + chunk.use_dictionary = true; + auto& inserted_map = hash_maps_storage.emplace_back(chunk.num_values, stream); + chunk.dict_map_slots = inserted_map.data(); + chunk.dict_map_size = inserted_map.size(); + } } - gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, stream); + + chunks.host_to_device(stream); + + gpu::initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream); + gpu::populate_chunk_hash_maps(chunks, num_rows, stream); + chunks.device_to_host(stream, true); + + // Make decision about which chunks have dictionary + for (auto& ck : h_chunks) { + if (not ck.use_dictionary) { continue; } + std::tie(ck.use_dictionary, ck.dict_rle_bits) = [&]() { + // calculate size of chunk if dictionary is used + + // If we have N unique values then the idx for the last value is N - 1 and nbits is the number + // of bits required to encode indices into the dictionary + auto max_dict_index = (ck.num_dict_entries > 0) ? ck.num_dict_entries - 1 : 0; + auto nbits = CompactProtocolReader::NumRequiredBits(max_dict_index); + + // We don't use dictionary if the indices are > 16 bits because that's the maximum bitpacking + // bitsize we efficiently support + if (nbits > 16) { return std::make_pair(false, 0); } + + // Only these bit sizes are allowed for RLE encoding because it's compute optimized + constexpr auto allowed_bitsizes = std::array{1, 2, 4, 8, 12, 16}; + + // ceil to (1/2/4/8/12/16) + auto rle_bits = *std::lower_bound(allowed_bitsizes.begin(), allowed_bitsizes.end(), nbits); + auto rle_byte_size = util::div_rounding_up_safe(ck.num_values * rle_bits, 8); + + auto dict_enc_size = ck.uniq_data_size + rle_byte_size; + + bool use_dict = (ck.plain_data_size > dict_enc_size); + if (not use_dict) { rle_bits = 0; } + return std::make_pair(use_dict, rle_bits); + }(); + } + + // TODO: (enh) Deallocate hash map storage for chunks that don't use dict and clear pointers. + + dict_data.reserve(h_chunks.size()); + dict_index.reserve(h_chunks.size()); + for (auto& chunk : h_chunks) { + if (not chunk.use_dictionary) { continue; } + + size_t dict_data_size = std::min(MAX_DICT_SIZE, chunk.dict_map_size); + auto& inserted_dict_data = dict_data.emplace_back(dict_data_size, stream); + auto& inserted_dict_index = dict_index.emplace_back(chunk.num_values, stream); + chunk.dict_data = inserted_dict_data.data(); + chunk.dict_index = inserted_dict_index.data(); + } + chunks.host_to_device(stream); + gpu::collect_map_entries(chunks.device_view().flat_view(), stream); + gpu::get_dictionary_indices(chunks.device_view(), num_rows, stream); + + return std::make_pair(std::move(dict_data), std::move(dict_index)); } void writer::impl::init_encoder_pages(hostdevice_2dvector& chunks, @@ -959,10 +996,8 @@ void writer::impl::write(table_view const& table) // Initialize column description hostdevice_vector col_desc(parquet_columns.size(), stream); - // This should've been `auto const&` but isn't since dictionary space is allocated when calling - // get_device_view(). Fix during dictionary refactor. std::transform( - parquet_columns.begin(), parquet_columns.end(), col_desc.host_ptr(), [&](auto& pcol) { + parquet_columns.begin(), parquet_columns.end(), col_desc.host_ptr(), [&](auto const& pcol) { return pcol.get_device_view(stream); }); @@ -973,11 +1008,9 @@ void writer::impl::write(table_view const& table) // ideally want the page size to be below 1MB so as to have enough pages to get good // compression/decompression performance). using cudf::io::parquet::gpu::max_page_fragment_size; - constexpr uint32_t fragment_size = 5000; - static_assert(fragment_size <= max_page_fragment_size, - "fragment size cannot be greater than max_page_fragment_size"); - uint32_t num_fragments = (uint32_t)((num_rows + fragment_size - 1) / fragment_size); + uint32_t num_fragments = + (uint32_t)((num_rows + max_page_fragment_size - 1) / max_page_fragment_size); cudf::detail::hostdevice_2dvector fragments( num_columns, num_fragments, stream); @@ -987,7 +1020,7 @@ void writer::impl::write(table_view const& table) leaf_column_views = create_leaf_column_device_views( col_desc, *parent_column_table_device_view, stream); - init_page_fragments(fragments, col_desc, num_rows, fragment_size); + init_page_fragments(fragments, col_desc, num_rows, max_page_fragment_size); } size_t global_rowgroup_base = md.row_groups.size(); @@ -1002,11 +1035,12 @@ void writer::impl::write(table_view const& table) for (auto i = 0; i < num_columns; i++) { fragment_data_size += fragments[i][f].fragment_data_size; } - if (f > rowgroup_start && (rowgroup_size + fragment_data_size > max_rowgroup_size_ || - (f + 1 - rowgroup_start) * fragment_size > max_rowgroup_rows_)) { + if (f > rowgroup_start && + (rowgroup_size + fragment_data_size > max_rowgroup_size_ || + (f + 1 - rowgroup_start) * max_page_fragment_size > max_rowgroup_rows_)) { // update schema md.row_groups.resize(md.row_groups.size() + 1); - md.row_groups[global_r++].num_rows = (f - rowgroup_start) * fragment_size; + md.row_groups[global_r++].num_rows = (f - rowgroup_start) * max_page_fragment_size; num_rowgroups++; rowgroup_start = f; rowgroup_size = 0; @@ -1015,7 +1049,7 @@ void writer::impl::write(table_view const& table) if (f + 1 == num_fragments) { // update schema md.row_groups.resize(md.row_groups.size() + 1); - md.row_groups[global_r++].num_rows = num_rows - rowgroup_start * fragment_size; + md.row_groups[global_r++].num_rows = num_rows - rowgroup_start * max_page_fragment_size; num_rowgroups++; } } @@ -1033,20 +1067,19 @@ void writer::impl::write(table_view const& table) // Initialize row groups and column chunks uint32_t num_chunks = num_rowgroups * num_columns; hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); - uint32_t num_dictionaries = 0; for (uint32_t r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; r++, global_r++) { - uint32_t fragments_in_chunk = - (uint32_t)((md.row_groups[global_r].num_rows + fragment_size - 1) / fragment_size); + uint32_t fragments_in_chunk = (uint32_t)( + (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size); md.row_groups[global_r].total_byte_size = 0; md.row_groups[global_r].columns.resize(num_columns); for (int i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; - bool dict_enable = false; - *ck = {}; - ck->col_desc = col_desc.device_ptr() + i; - ck->fragments = &fragments.device_view()[i][f]; + *ck = {}; + ck->col_desc = col_desc.device_ptr() + i; + ck->col_desc_id = i; + ck->fragments = &fragments.device_view()[i][f]; ck->stats = (frag_stats.size() != 0) ? frag_stats.data() + i * num_fragments + f : nullptr; ck->start_row = start_row; ck->num_rows = (uint32_t)md.row_groups[global_r].num_rows; @@ -1056,30 +1089,12 @@ void writer::impl::write(table_view const& table) std::accumulate(chunk_fragments.begin(), chunk_fragments.end(), 0, [](uint32_t l, auto r) { return l + r.num_values; }); - ck->dictionary_id = num_dictionaries; - if (col_desc[i].dict_data) { - size_t plain_size = 0; - size_t dict_size = 1; - uint32_t num_dict_vals = 0; - for (uint32_t j = 0; j < fragments_in_chunk && num_dict_vals < 65536; j++) { - plain_size += chunk_fragments[j].fragment_data_size; - dict_size += chunk_fragments[j].dict_data_size + - ((num_dict_vals > 256) ? 2 : 1) * chunk_fragments[j].non_nulls; - num_dict_vals += chunk_fragments[j].num_dict_vals; - } - if (dict_size < plain_size) { - parquet_columns[i].use_dictionary(true); - dict_enable = true; - num_dictionaries++; - } - } - ck->has_dictionary = dict_enable; + ck->plain_data_size = std::accumulate( + chunk_fragments.begin(), chunk_fragments.end(), 0, [](int sum, gpu::PageFragment frag) { + return sum + frag.fragment_data_size; + }); md.row_groups[global_r].columns[i].meta_data.type = parquet_columns[i].physical_type(); md.row_groups[global_r].columns[i].meta_data.encodings = {Encoding::PLAIN, Encoding::RLE}; - if (dict_enable) { - md.row_groups[global_r].columns[i].meta_data.encodings.push_back( - Encoding::PLAIN_DICTIONARY); - } md.row_groups[global_r].columns[i].meta_data.path_in_schema = parquet_columns[i].get_path_in_schema(); md.row_groups[global_r].columns[i].meta_data.codec = UNCOMPRESSED; @@ -1089,15 +1104,18 @@ void writer::impl::write(table_view const& table) start_row += (uint32_t)md.row_groups[global_r].num_rows; } - // Free unused dictionaries - for (auto& col : parquet_columns) { - col.check_dictionary_used(stream); + auto dict_info_owner = build_chunk_dictionaries(chunks, col_desc, num_rows, stream); + for (uint32_t rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { + for (int col = 0; col < num_columns; col++) { + if (chunks.host_view()[rg][col].use_dictionary) { + md.row_groups[global_rg].columns[col].meta_data.encodings.push_back( + Encoding::PLAIN_DICTIONARY); + } + } } // Build chunk dictionaries and count pages - if (num_chunks != 0) { - build_chunk_dictionaries(chunks, col_desc, num_columns, num_dictionaries); - } + if (num_chunks != 0) { init_page_sizes(chunks, col_desc, num_columns); } // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) std::vector batch_list; @@ -1247,9 +1265,9 @@ void writer::impl::write(table_view const& table) } md.row_groups[global_r].total_byte_size += ck->compressed_size; md.row_groups[global_r].columns[i].meta_data.data_page_offset = - current_chunk_offset + ((ck->has_dictionary) ? ck->dictionary_size : 0); + current_chunk_offset + ((ck->use_dictionary) ? ck->dictionary_size : 0); md.row_groups[global_r].columns[i].meta_data.dictionary_page_offset = - (ck->has_dictionary) ? current_chunk_offset : 0; + (ck->use_dictionary) ? current_chunk_offset : 0; md.row_groups[global_r].columns[i].meta_data.total_uncompressed_size = ck->bfr_size; md.row_groups[global_r].columns[i].meta_data.total_compressed_size = ck->compressed_size; current_chunk_offset += ck->compressed_size; diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 8d9bdc8adbd..8fb1a8294fb 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -153,12 +153,11 @@ class writer::impl { * @param chunks column chunk array * @param col_desc column description array * @param num_columns Total number of columns - * @param num_dictionaries Total number of dictionaries */ - void build_chunk_dictionaries(hostdevice_2dvector& chunks, - device_span col_desc, - uint32_t num_columns, - uint32_t num_dictionaries); + void init_page_sizes(hostdevice_2dvector& chunks, + device_span col_desc, + uint32_t num_columns); + /** * @brief Initialize encoder pages * diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 8fdfc6f9165..70b4bd1d873 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -275,10 +275,10 @@ inline auto random_values(size_t size) TYPED_TEST(ParquetWriterNumericTypeTest, SingleColumn) { auto sequence = - cudf::detail::make_counting_transform_iterator(0, [](auto i) { return TypeParam(i); }); + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return TypeParam(i % 400); }); auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return true; }); - constexpr auto num_rows = 100; + constexpr auto num_rows = 800; column_wrapper col(sequence, sequence + num_rows, validity); std::vector> cols;