Skip to content

Commit

Permalink
Fix writing of Parquet files with many fragments (#11869)
Browse files Browse the repository at this point in the history
This PR fixes an error that can occur when very small page sizes are used when writing Parquet files. #11551 changed from fixed 5000 row page fragments to a scaled value based on the requested max page size. For small page sizes, the number of fragments to process can exceed 64k. The number of fragments is used as the `y` dimension when calling `gpuInitPageFragments`, and when it exceeds 64k the kernel fails to launch, ultimately leading to an invalid memory access.

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #11869
  • Loading branch information
etseidl authored Oct 20, 2022
1 parent 536ddd0 commit 98185fe
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 85 deletions.
184 changes: 99 additions & 85 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ constexpr int32_t NO_TRUNC_STATS = 0;
// minimum scratch space required for encoding statistics
constexpr size_t MIN_STATS_SCRATCH_SIZE = sizeof(__int128_t);

// mask to determine lane id
constexpr uint32_t WARP_MASK = cudf::detail::warp_size - 1;

// currently 64k - 1
constexpr uint32_t MAX_GRID_Y_SIZE = (1 << 16) - 1;

struct frag_init_state_s {
parquet_column_device_view col;
PageFragment frag;
Expand Down Expand Up @@ -116,82 +122,87 @@ __global__ void __launch_bounds__(block_size)
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

frag_init_state_s* const s = &state_g;
uint32_t t = threadIdx.x;
int frag_y = blockIdx.y;
auto const physical_type = col_desc[blockIdx.x].physical_type;
frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
auto const physical_type = col_desc[blockIdx.x].physical_type;
uint32_t const num_fragments_per_column = frag.size().second;

if (t == 0) s->col = col_desc[blockIdx.x];
if (t == 0) { s->col = col_desc[blockIdx.x]; }
__syncthreads();
if (!t) {
// Find which partition this fragment came from
auto it =
thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y);
int p = it - part_frag_offset.begin() - 1;
int part_end_row = partitions[p].start_row + partitions[p].num_rows;
s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row;

// frag.num_rows = fragment_size except for the last fragment in partition which can be smaller.
// num_rows is fixed but fragment size could be larger if the data is strings or nested.
s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row);
s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
// For nested schemas, the number of values in a fragment is not directly related to the
// number of encoded data elements or the number of rows. It is simply the number of
// repetition/definition values which together encode validity and nesting information.
size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row];
size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows];
s->frag.num_values = last_level_val_idx - first_level_val_idx;
} else {
s->frag.num_values = s->frag.num_rows;
}
}

auto const leaf_type = s->col.leaf_column->type().id();
auto const dtype_len = physical_type_len(physical_type, leaf_type);
__syncthreads();

size_type nvals = s->frag.num_leaf_values;
size_type start_value_idx = s->frag.start_value_idx;

for (uint32_t i = 0; i < nvals; i += block_size) {
uint32_t val_idx = start_value_idx + i + t;
uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size())
? s->col.leaf_column->is_valid(val_idx)
: 0;
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
for (uint32_t frag_y = blockIdx.y; frag_y < num_fragments_per_column; frag_y += gridDim.y) {
if (t == 0) {
// Find which partition this fragment came from
auto it =
thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y);
int p = it - part_frag_offset.begin() - 1;
int part_end_row = partitions[p].start_row + partitions[p].num_rows;
s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row;

// frag.num_rows = fragment_size except for the last fragment in partition which can be
// smaller. num_rows is fixed but fragment size could be larger if the data is strings or
// nested.
s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row);
s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
// For nested schemas, the number of values in a fragment is not directly related to the
// number of encoded data elements or the number of rows. It is simply the number of
// repetition/definition values which together encode validity and nesting information.
size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row];
size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows];
s->frag.num_values = last_level_val_idx - first_level_val_idx;
} else {
s->frag.num_values = s->frag.num_rows;
}
} else {
len = 0;
}
__syncthreads();

len = block_reduce(reduce_storage).Sum(len);
if (!t) { s->frag.fragment_data_size += len; }
size_type nvals = s->frag.num_leaf_values;
size_type start_value_idx = s->frag.start_value_idx;

for (uint32_t i = 0; i < nvals; i += block_size) {
uint32_t val_idx = start_value_idx + i + t;
uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size())
? s->col.leaf_column->is_valid(val_idx)
: 0;
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
}
} else {
len = 0;
}

len = block_reduce(reduce_storage).Sum(len);
if (t == 0) { s->frag.fragment_data_size += len; }
__syncthreads();
}
__syncthreads();
if (t == 0) { frag[blockIdx.x][frag_y] = s->frag; }
}
__syncthreads();
if (t == 0) frag[blockIdx.x][blockIdx.y] = s->frag;
}

// blockDim {128,1,1}
Expand All @@ -200,21 +211,21 @@ __global__ void __launch_bounds__(128)
device_2dspan<PageFragment const> fragments,
device_span<parquet_column_device_view const> col_desc)
{
// TODO: why not 1 block per warp?
__shared__ __align__(8) statistics_group group_g[4];

uint32_t lane_id = threadIdx.x & 0x1f;
uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x >> 5);
uint32_t column_id = blockIdx.x;
auto num_fragments_per_column = fragments.size().second;
statistics_group* const g = &group_g[threadIdx.x >> 5];
if (!lane_id && frag_id < num_fragments_per_column) {
g->col = &col_desc[column_id];
g->start_row = fragments[column_id][frag_id].start_value_idx;
g->num_rows = fragments[column_id][frag_id].num_leaf_values;
uint32_t const lane_id = threadIdx.x & WARP_MASK;
uint32_t const column_id = blockIdx.x;
uint32_t const num_fragments_per_column = fragments.size().second;

uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x / cudf::detail::warp_size);
while (frag_id < num_fragments_per_column) {
if (lane_id == 0) {
statistics_group g;
g.col = &col_desc[column_id];
g.start_row = fragments[column_id][frag_id].start_value_idx;
g.num_rows = fragments[column_id][frag_id].num_leaf_values;
groups[column_id][frag_id] = g;
}
frag_id += gridDim.y * 4;
}
__syncthreads();
if (frag_id < num_fragments_per_column and lane_id == 0) groups[column_id][frag_id] = *g;
}

// blockDim {128,1,1}
Expand Down Expand Up @@ -2017,9 +2028,10 @@ void InitPageFragments(device_2dspan<PageFragment> frag,
uint32_t fragment_size,
rmm::cuda_stream_view stream)
{
auto num_columns = frag.size().first;
auto num_fragments_per_column = frag.size().second;
dim3 dim_grid(num_columns, num_fragments_per_column); // 1 threadblock per fragment
auto const num_columns = frag.size().first;
auto const num_fragments_per_column = frag.size().second;
auto const grid_y = std::min(static_cast<uint32_t>(num_fragments_per_column), MAX_GRID_Y_SIZE);
dim3 const dim_grid(num_columns, grid_y); // 1 threadblock per fragment
gpuInitPageFragments<512><<<dim_grid, 512, 0, stream.value()>>>(
frag, col_desc, partitions, part_frag_offset, fragment_size);
}
Expand All @@ -2031,8 +2043,10 @@ void InitFragmentStatistics(device_2dspan<statistics_group> groups,
{
int const num_columns = col_desc.size();
int const num_fragments_per_column = fragments.size().second;
auto grid_y = util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size);
dim3 dim_grid(num_columns, grid_y); // 1 warp per fragment
auto const y_dim =
util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size);
auto const grid_y = std::min(static_cast<uint32_t>(y_dim), MAX_GRID_Y_SIZE);
dim3 const dim_grid(num_columns, grid_y); // 1 warp per fragment
gpuInitFragmentStats<<<dim_grid, 128, 0, stream.value()>>>(groups, fragments, col_desc);
}

Expand Down
18 changes: 18 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,24 @@ TEST_F(ParquetWriterTest, HostBuffer)
cudf::test::expect_metadata_equal(expected_metadata, result.metadata);
}

TEST_F(ParquetWriterTest, ManyFragments)
{
srand(31337);
auto const expected = create_random_fixed_table<int>(10, 6'000'000, false);

auto const filepath = temp_env->get_temp_filepath("ManyFragments.parquet");
cudf::io::parquet_writer_options const args =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected)
.max_page_size_bytes(8 * 1024);
cudf::io::write_parquet(args);

cudf::io::parquet_reader_options const read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto const result = cudf::io::read_parquet(read_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected);
}

TEST_F(ParquetWriterTest, NonNullable)
{
srand(31337);
Expand Down

0 comments on commit 98185fe

Please sign in to comment.