Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variable fragment sizes for Parquet writer #12685

Merged
merged 55 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
42cf39e
get column sizes
etseidl Feb 1, 2023
3612d88
defer calculation of fragment stats
etseidl Feb 1, 2023
b2c8ef4
working without stats
etseidl Feb 2, 2023
1ce28e7
stats working now
etseidl Feb 2, 2023
af4f12b
fix some TODOs
etseidl Feb 2, 2023
c0c0882
fix fragment sizes for LargeColumnIndex test
etseidl Feb 2, 2023
7a7e8f3
move calc of frags per column up to remove some redundancy
etseidl Feb 2, 2023
f934e4c
rework so new fragments are hostdevice_vector
etseidl Feb 2, 2023
f4eaf99
add docstrings
etseidl Feb 2, 2023
de842d1
formatting
etseidl Feb 2, 2023
a016ee3
move function
etseidl Feb 2, 2023
78b4c4b
refactor some redundant code
etseidl Feb 2, 2023
779ab4e
consts
etseidl Feb 2, 2023
18ee7c1
safer fragment size calculation
etseidl Feb 3, 2023
cd9bbcf
target multiple fragments per page to attempt to stay under page size…
etseidl Feb 3, 2023
61a64ee
clean up some synch calls
etseidl Feb 3, 2023
138d5f4
fix structs of lists test
etseidl Feb 3, 2023
4031f68
change non-default fragment size detection and allow for variable row…
etseidl Feb 6, 2023
9980e4c
fix for fixed-length columns
etseidl Feb 6, 2023
f0cd465
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 6, 2023
8cb5972
need to calculate global frag size before individual columns
etseidl Feb 6, 2023
608676a
need device pointer
etseidl Feb 7, 2023
c3aa00d
preserve frag chunk, even though it is no longer used
etseidl Feb 7, 2023
5e6e595
fix comment per review
etseidl Feb 7, 2023
34303ce
check for fixed-width structs
etseidl Feb 7, 2023
f705150
Merge branch 'feature/frag_sizev4' of github.com:etseidl/cudf into fe…
etseidl Feb 7, 2023
245754f
simplify the fragment resize logic some
etseidl Feb 7, 2023
79f8b30
only need one version of gather_fragment_statistics() now
etseidl Feb 7, 2023
ad168b2
remove unneeded vector size check
etseidl Feb 7, 2023
cc9c47a
skip loop if neither resizing nor gathering frag stats
etseidl Feb 7, 2023
9b78643
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 8, 2023
b32f835
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 8, 2023
da5f41c
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 9, 2023
490720b
Merge branch 'branch-23.04' into feature/frag_sizev4
vuule Feb 10, 2023
ca23943
suggestions from review and formatting changes
etseidl Feb 10, 2023
5e13e86
Merge branch 'feature/frag_sizev4' of github.com:etseidl/cudf into fe…
etseidl Feb 10, 2023
c814d74
always recalculate fragments per review suggestion
etseidl Feb 10, 2023
78da29e
make page fragment size std::optional
etseidl Feb 10, 2023
20fb273
more renaming
etseidl Feb 10, 2023
12e3e7b
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 10, 2023
57b6859
rename some methods to make their purpose more clear, and update
etseidl Feb 10, 2023
4d851d6
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 10, 2023
16e19be
Merge branch 'branch-23.04' into feature/frag_sizev4
vuule Feb 13, 2023
61a5c02
fix for pages exceeding requested number of rows
etseidl Feb 13, 2023
45a20b7
Merge branch 'feature/frag_sizev4' of github.com:etseidl/cudf into fe…
etseidl Feb 13, 2023
e2fd476
suggestion from review
etseidl Feb 15, 2023
727cb6d
suggestion from review
etseidl Feb 15, 2023
53b3610
more changes from review
etseidl Feb 15, 2023
9d6a58c
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 15, 2023
f5baf18
Merge branch 'rapidsai:branch-23.04' into feature/frag_sizev4
etseidl Feb 16, 2023
2150d2c
implement suggestions from review
etseidl Feb 16, 2023
70d0d8e
Merge branch 'branch-23.04' into feature/frag_sizev4
vuule Feb 16, 2023
fe025e4
Merge branch 'branch-23.04' into feature/frag_sizev4
vuule Feb 16, 2023
051f5e4
Merge branch 'branch-23.04' into feature/frag_sizev4
hyperbolic2346 Feb 22, 2023
5be1676
Merge branch 'branch-23.04' into feature/frag_sizev4
hyperbolic2346 Feb 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class parquet_writer_options {
// Maximum size of column chunk dictionary (in bytes)
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
size_type _max_page_fragment_size = default_max_page_fragment_size;
std::optional<size_type> _max_page_fragment_size;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -1076,7 +1076,7 @@ class chunked_parquet_writer_options {
// Maximum size of column chunk dictionary (in bytes)
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
size_type _max_page_fragment_size = default_max_page_fragment_size;
std::optional<size_type> _max_page_fragment_size;

/**
* @brief Constructor from sink.
Expand Down
251 changes: 144 additions & 107 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -126,129 +126,164 @@ constexpr size_t underflow_safe_subtract(size_t a, size_t b)
return a - b;
}

void __device__ init_frag_state(frag_init_state_s* const s,
uint32_t fragment_size,
int part_end_row)
{
// frag.num_rows = fragment_size except for the last fragment in partition which can be
// smaller. num_rows is fixed but fragment size could be larger if the data is strings or
// nested.
s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row);
s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
auto const end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
// For nested schemas, the number of values in a fragment is not directly related to the
// number of encoded data elements or the number of rows. It is simply the number of
// repetition/definition values which together encode validity and nesting information.
auto const first_level_val_idx = s->col.level_offsets[s->frag.start_row];
auto const last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows];
s->frag.num_values = last_level_val_idx - first_level_val_idx;
} else {
s->frag.num_values = s->frag.num_rows;
}
}

template <int block_size>
void __device__ calculate_frag_size(frag_init_state_s* const s, int t)
{
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

auto const physical_type = s->col.physical_type;
auto const leaf_type = s->col.leaf_column->type().id();
auto const dtype_len = physical_type_len(physical_type, leaf_type);
auto const nvals = s->frag.num_leaf_values;
auto const start_value_idx = s->frag.start_value_idx;

for (uint32_t i = 0; i < nvals; i += block_size) {
auto const val_idx = start_value_idx + i + t;
auto const is_valid = i + t < nvals && val_idx < s->col.leaf_column->size() &&
s->col.leaf_column->is_valid(val_idx);
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
}
} else {
len = 0;
}

len = block_reduce(reduce_storage).Sum(len);
if (t == 0) { s->frag.fragment_data_size += len; }
__syncthreads();
// page fragment size must fit in a 32-bit signed integer
if (s->frag.fragment_data_size > std::numeric_limits<int32_t>::max()) {
CUDF_UNREACHABLE("page fragment size exceeds maximum for i32");
}
}
}

} // anonymous namespace

// blockDim {512,1,1}
template <int block_size>
__global__ void __launch_bounds__(block_size)
gpuInitPageFragments(device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
device_span<partition_info const> partitions,
device_span<int const> part_frag_offset,
uint32_t fragment_size)
gpuInitRowGroupFragments(device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
device_span<partition_info const> partitions,
device_span<int const> part_frag_offset,
uint32_t fragment_size)
{
__shared__ __align__(16) frag_init_state_s state_g;

using block_reduce = cub::BlockReduce<uint32_t, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
auto const physical_type = col_desc[blockIdx.x].physical_type;
uint32_t const num_fragments_per_column = frag.size().second;

if (t == 0) { s->col = col_desc[blockIdx.x]; }
__syncthreads();

auto const leaf_type = s->col.leaf_column->type().id();
auto const dtype_len = physical_type_len(physical_type, leaf_type);

for (uint32_t frag_y = blockIdx.y; frag_y < num_fragments_per_column; frag_y += gridDim.y) {
if (t == 0) {
// Find which partition this fragment came from
auto it =
thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y);
int p = it - part_frag_offset.begin() - 1;
int part_end_row = partitions[p].start_row + partitions[p].num_rows;
int const p = it - part_frag_offset.begin() - 1;
int const part_end_row = partitions[p].start_row + partitions[p].num_rows;
s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row;

// frag.num_rows = fragment_size except for the last fragment in partition which can be
// smaller. num_rows is fixed but fragment size could be larger if the data is strings or
// nested.
s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row);
s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
// For nested schemas, the number of values in a fragment is not directly related to the
// number of encoded data elements or the number of rows. It is simply the number of
// repetition/definition values which together encode validity and nesting information.
size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row];
size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows];
s->frag.num_values = last_level_val_idx - first_level_val_idx;
} else {
s->frag.num_values = s->frag.num_rows;
}
s->frag.chunk = frag[blockIdx.x][frag_y].chunk;
init_frag_state(s, fragment_size, part_end_row);
}
__syncthreads();

size_type nvals = s->frag.num_leaf_values;
size_type start_value_idx = s->frag.start_value_idx;

for (uint32_t i = 0; i < nvals; i += block_size) {
uint32_t val_idx = start_value_idx + i + t;
uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size())
? s->col.leaf_column->is_valid(val_idx)
: 0;
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
}
} else {
len = 0;
}

len = block_reduce(reduce_storage).Sum(len);
if (t == 0) { s->frag.fragment_data_size += len; }
__syncthreads();
// page fragment size must fit in a 32-bit signed integer
if (s->frag.fragment_data_size > std::numeric_limits<int32_t>::max()) {
CUDF_UNREACHABLE("page fragment size exceeds maximum for i32");
}
}
calculate_frag_size<block_size>(s, t);
__syncthreads();
if (t == 0) { frag[blockIdx.x][frag_y] = s->frag; }
}
}

// blockDim {512,1,1}
template <int block_size>
__global__ void __launch_bounds__(block_size)
gpuCalculatePageFragments(device_span<PageFragment> frag,
device_span<size_type const> column_frag_sizes)
{
__shared__ __align__(16) frag_init_state_s state_g;

EncColumnChunk* const ck_g = frag[blockIdx.x].chunk;
frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
auto const fragment_size = column_frag_sizes[ck_g->col_desc_id];

if (t == 0) { s->col = *ck_g->col_desc; }
__syncthreads();

if (t == 0) {
int const part_end_row = ck_g->start_row + ck_g->num_rows;
s->frag.start_row = ck_g->start_row + (blockIdx.x - ck_g->first_fragment) * fragment_size;
s->frag.chunk = ck_g;
init_frag_state(s, fragment_size, part_end_row);
}
__syncthreads();

calculate_frag_size<block_size>(s, t);
if (t == 0) { frag[blockIdx.x] = s->frag; }
}

// blockDim {128,1,1}
__global__ void __launch_bounds__(128)
gpuInitFragmentStats(device_2dspan<statistics_group> groups,
device_2dspan<PageFragment const> fragments,
device_span<parquet_column_device_view const> col_desc)
gpuInitFragmentStats(device_span<statistics_group> groups,
device_span<PageFragment const> fragments)
{
uint32_t const lane_id = threadIdx.x & WARP_MASK;
uint32_t const column_id = blockIdx.x;
uint32_t const num_fragments_per_column = fragments.size().second;

uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x / cudf::detail::warp_size);
while (frag_id < num_fragments_per_column) {
uint32_t const lane_id = threadIdx.x & WARP_MASK;
uint32_t const frag_id = blockIdx.x * 4 + (threadIdx.x / cudf::detail::warp_size);
if (frag_id < fragments.size()) {
if (lane_id == 0) {
statistics_group g;
g.col = &col_desc[column_id];
g.start_row = fragments[column_id][frag_id].start_value_idx;
g.num_rows = fragments[column_id][frag_id].num_leaf_values;
groups[column_id][frag_id] = g;
auto* const ck_g = fragments[frag_id].chunk;
g.col = ck_g->col_desc;
g.start_row = fragments[frag_id].start_value_idx;
g.num_rows = fragments[frag_id].num_leaf_values;
groups[frag_id] = g;
}
frag_id += gridDim.y * 4;
}
}

Expand Down Expand Up @@ -389,7 +424,7 @@ __global__ void __launch_bounds__(128)

if (num_rows >= ck_g.num_rows ||
(values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) ||
rows_in_page >= max_page_size_rows) {
rows_in_page + frag_g.num_rows > max_page_size_rows) {
if (ck_g.use_dictionary) {
// Additional byte to store entry bit width
page_size = 1 + max_RLE_page_size(ck_g.dict_rle_bits, values_in_page);
Expand Down Expand Up @@ -2057,33 +2092,35 @@ __global__ void __launch_bounds__(1)
ck_g->column_index_size = static_cast<uint32_t>(col_idx_end - ck_g->column_index_blob);
}

void InitPageFragments(device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
device_span<partition_info const> partitions,
device_span<int const> part_frag_offset,
uint32_t fragment_size,
rmm::cuda_stream_view stream)
void InitRowGroupFragments(device_2dspan<PageFragment> frag,
device_span<parquet_column_device_view const> col_desc,
device_span<partition_info const> partitions,
device_span<int const> part_frag_offset,
uint32_t fragment_size,
rmm::cuda_stream_view stream)
{
auto const num_columns = frag.size().first;
auto const num_fragments_per_column = frag.size().second;
auto const grid_y = std::min(static_cast<uint32_t>(num_fragments_per_column), MAX_GRID_Y_SIZE);
dim3 const dim_grid(num_columns, grid_y); // 1 threadblock per fragment
gpuInitPageFragments<512><<<dim_grid, 512, 0, stream.value()>>>(
gpuInitRowGroupFragments<512><<<dim_grid, 512, 0, stream.value()>>>(
frag, col_desc, partitions, part_frag_offset, fragment_size);
}

void InitFragmentStatistics(device_2dspan<statistics_group> groups,
device_2dspan<PageFragment const> fragments,
device_span<parquet_column_device_view const> col_desc,
void CalculatePageFragments(device_span<PageFragment> frag,
device_span<size_type const> column_frag_sizes,
rmm::cuda_stream_view stream)
{
gpuCalculatePageFragments<512><<<frag.size(), 512, 0, stream>>>(frag, column_frag_sizes);
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved
}

void InitFragmentStatistics(device_span<statistics_group> groups,
device_span<PageFragment const> fragments,
rmm::cuda_stream_view stream)
{
int const num_columns = col_desc.size();
int const num_fragments_per_column = fragments.size().second;
auto const y_dim =
util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size);
auto const grid_y = std::min(static_cast<uint32_t>(y_dim), MAX_GRID_Y_SIZE);
dim3 const dim_grid(num_columns, grid_y); // 1 warp per fragment
gpuInitFragmentStats<<<dim_grid, 128, 0, stream.value()>>>(groups, fragments, col_desc);
int const num_fragments = fragments.size();
int const dim = util::div_rounding_up_safe(num_fragments, 128 / cudf::detail::warp_size);
gpuInitFragmentStats<<<dim, 128, 0, stream.value()>>>(groups, fragments);
}

void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
Expand Down
Loading