diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index cdee066a06a..7c5651b1ef8 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -61,6 +61,12 @@ constexpr int32_t NO_TRUNC_STATS = 0; // minimum scratch space required for encoding statistics constexpr size_t MIN_STATS_SCRATCH_SIZE = sizeof(__int128_t); +// mask to determine lane id +constexpr uint32_t WARP_MASK = cudf::detail::warp_size - 1; + +// currently 64k - 1 +constexpr uint32_t MAX_GRID_Y_SIZE = (1 << 16) - 1; + struct frag_init_state_s { parquet_column_device_view col; PageFragment frag; @@ -116,82 +122,87 @@ __global__ void __launch_bounds__(block_size) using block_reduce = cub::BlockReduce; __shared__ typename block_reduce::TempStorage reduce_storage; - frag_init_state_s* const s = &state_g; - uint32_t t = threadIdx.x; - int frag_y = blockIdx.y; - auto const physical_type = col_desc[blockIdx.x].physical_type; + frag_init_state_s* const s = &state_g; + uint32_t const t = threadIdx.x; + auto const physical_type = col_desc[blockIdx.x].physical_type; + uint32_t const num_fragments_per_column = frag.size().second; - if (t == 0) s->col = col_desc[blockIdx.x]; + if (t == 0) { s->col = col_desc[blockIdx.x]; } __syncthreads(); - if (!t) { - // Find which partition this fragment came from - auto it = - thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y); - int p = it - part_frag_offset.begin() - 1; - int part_end_row = partitions[p].start_row + partitions[p].num_rows; - s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row; - - // frag.num_rows = fragment_size except for the last fragment in partition which can be smaller. - // num_rows is fixed but fragment size could be larger if the data is strings or nested. - s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row); - s->frag.num_dict_vals = 0; - s->frag.fragment_data_size = 0; - s->frag.dict_data_size = 0; - - s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col); - size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col); - s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx; - - if (s->col.level_offsets != nullptr) { - // For nested schemas, the number of values in a fragment is not directly related to the - // number of encoded data elements or the number of rows. It is simply the number of - // repetition/definition values which together encode validity and nesting information. - size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row]; - size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows]; - s->frag.num_values = last_level_val_idx - first_level_val_idx; - } else { - s->frag.num_values = s->frag.num_rows; - } - } + auto const leaf_type = s->col.leaf_column->type().id(); auto const dtype_len = physical_type_len(physical_type, leaf_type); - __syncthreads(); - size_type nvals = s->frag.num_leaf_values; - size_type start_value_idx = s->frag.start_value_idx; - - for (uint32_t i = 0; i < nvals; i += block_size) { - uint32_t val_idx = start_value_idx + i + t; - uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size()) - ? s->col.leaf_column->is_valid(val_idx) - : 0; - uint32_t len; - if (is_valid) { - len = dtype_len; - if (physical_type == BYTE_ARRAY) { - switch (leaf_type) { - case type_id::STRING: { - auto str = s->col.leaf_column->element(val_idx); - len += str.size_bytes(); - } break; - case type_id::LIST: { - auto list_element = - get_element(*s->col.leaf_column, val_idx); - len += list_element.size_bytes(); - } break; - default: CUDF_UNREACHABLE("Unsupported data type for leaf column"); - } + for (uint32_t frag_y = blockIdx.y; frag_y < num_fragments_per_column; frag_y += gridDim.y) { + if (t == 0) { + // Find which partition this fragment came from + auto it = + thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y); + int p = it - part_frag_offset.begin() - 1; + int part_end_row = partitions[p].start_row + partitions[p].num_rows; + s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row; + + // frag.num_rows = fragment_size except for the last fragment in partition which can be + // smaller. num_rows is fixed but fragment size could be larger if the data is strings or + // nested. + s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row); + s->frag.num_dict_vals = 0; + s->frag.fragment_data_size = 0; + s->frag.dict_data_size = 0; + + s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col); + size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col); + s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx; + + if (s->col.level_offsets != nullptr) { + // For nested schemas, the number of values in a fragment is not directly related to the + // number of encoded data elements or the number of rows. It is simply the number of + // repetition/definition values which together encode validity and nesting information. + size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row]; + size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows]; + s->frag.num_values = last_level_val_idx - first_level_val_idx; + } else { + s->frag.num_values = s->frag.num_rows; } - } else { - len = 0; } + __syncthreads(); - len = block_reduce(reduce_storage).Sum(len); - if (!t) { s->frag.fragment_data_size += len; } + size_type nvals = s->frag.num_leaf_values; + size_type start_value_idx = s->frag.start_value_idx; + + for (uint32_t i = 0; i < nvals; i += block_size) { + uint32_t val_idx = start_value_idx + i + t; + uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size()) + ? s->col.leaf_column->is_valid(val_idx) + : 0; + uint32_t len; + if (is_valid) { + len = dtype_len; + if (physical_type == BYTE_ARRAY) { + switch (leaf_type) { + case type_id::STRING: { + auto str = s->col.leaf_column->element(val_idx); + len += str.size_bytes(); + } break; + case type_id::LIST: { + auto list_element = + get_element(*s->col.leaf_column, val_idx); + len += list_element.size_bytes(); + } break; + default: CUDF_UNREACHABLE("Unsupported data type for leaf column"); + } + } + } else { + len = 0; + } + + len = block_reduce(reduce_storage).Sum(len); + if (t == 0) { s->frag.fragment_data_size += len; } + __syncthreads(); + } __syncthreads(); + if (t == 0) { frag[blockIdx.x][frag_y] = s->frag; } } - __syncthreads(); - if (t == 0) frag[blockIdx.x][blockIdx.y] = s->frag; } // blockDim {128,1,1} @@ -200,21 +211,21 @@ __global__ void __launch_bounds__(128) device_2dspan fragments, device_span col_desc) { - // TODO: why not 1 block per warp? - __shared__ __align__(8) statistics_group group_g[4]; - - uint32_t lane_id = threadIdx.x & 0x1f; - uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x >> 5); - uint32_t column_id = blockIdx.x; - auto num_fragments_per_column = fragments.size().second; - statistics_group* const g = &group_g[threadIdx.x >> 5]; - if (!lane_id && frag_id < num_fragments_per_column) { - g->col = &col_desc[column_id]; - g->start_row = fragments[column_id][frag_id].start_value_idx; - g->num_rows = fragments[column_id][frag_id].num_leaf_values; + uint32_t const lane_id = threadIdx.x & WARP_MASK; + uint32_t const column_id = blockIdx.x; + uint32_t const num_fragments_per_column = fragments.size().second; + + uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x / cudf::detail::warp_size); + while (frag_id < num_fragments_per_column) { + if (lane_id == 0) { + statistics_group g; + g.col = &col_desc[column_id]; + g.start_row = fragments[column_id][frag_id].start_value_idx; + g.num_rows = fragments[column_id][frag_id].num_leaf_values; + groups[column_id][frag_id] = g; + } + frag_id += gridDim.y * 4; } - __syncthreads(); - if (frag_id < num_fragments_per_column and lane_id == 0) groups[column_id][frag_id] = *g; } // blockDim {128,1,1} @@ -2017,9 +2028,10 @@ void InitPageFragments(device_2dspan frag, uint32_t fragment_size, rmm::cuda_stream_view stream) { - auto num_columns = frag.size().first; - auto num_fragments_per_column = frag.size().second; - dim3 dim_grid(num_columns, num_fragments_per_column); // 1 threadblock per fragment + auto const num_columns = frag.size().first; + auto const num_fragments_per_column = frag.size().second; + auto const grid_y = std::min(static_cast(num_fragments_per_column), MAX_GRID_Y_SIZE); + dim3 const dim_grid(num_columns, grid_y); // 1 threadblock per fragment gpuInitPageFragments<512><<>>( frag, col_desc, partitions, part_frag_offset, fragment_size); } @@ -2031,8 +2043,10 @@ void InitFragmentStatistics(device_2dspan groups, { int const num_columns = col_desc.size(); int const num_fragments_per_column = fragments.size().second; - auto grid_y = util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size); - dim3 dim_grid(num_columns, grid_y); // 1 warp per fragment + auto const y_dim = + util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size); + auto const grid_y = std::min(static_cast(y_dim), MAX_GRID_Y_SIZE); + dim3 const dim_grid(num_columns, grid_y); // 1 warp per fragment gpuInitFragmentStats<<>>(groups, fragments, col_desc); } diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 134eff54144..c85c8fe7105 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -1048,6 +1048,24 @@ TEST_F(ParquetWriterTest, HostBuffer) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } +TEST_F(ParquetWriterTest, ManyFragments) +{ + srand(31337); + auto const expected = create_random_fixed_table(10, 6'000'000, false); + + auto const filepath = temp_env->get_temp_filepath("ManyFragments.parquet"); + cudf::io::parquet_writer_options const args = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected) + .max_page_size_bytes(8 * 1024); + cudf::io::write_parquet(args); + + cudf::io::parquet_reader_options const read_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto const result = cudf::io::read_parquet(read_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected); +} + TEST_F(ParquetWriterTest, NonNullable) { srand(31337);