Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing of Parquet files with many fragments #11869

Merged
merged 12 commits into from
Oct 20, 2022
178 changes: 93 additions & 85 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -116,82 +116,87 @@ __global__ void __launch_bounds__(block_size)
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
__shared__ typename block_reduce::TempStorage reduce_storage;

frag_init_state_s* const s = &state_g;
uint32_t t = threadIdx.x;
int frag_y = blockIdx.y;
auto const physical_type = col_desc[blockIdx.x].physical_type;
frag_init_state_s* const s = &state_g;
uint32_t const t = threadIdx.x;
auto const physical_type = col_desc[blockIdx.x].physical_type;
uint32_t const num_fragments_per_column = frag.size().second;

if (t == 0) s->col = col_desc[blockIdx.x];
if (t == 0) { s->col = col_desc[blockIdx.x]; }
__syncthreads();
if (!t) {
// Find which partition this fragment came from
auto it =
thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y);
int p = it - part_frag_offset.begin() - 1;
int part_end_row = partitions[p].start_row + partitions[p].num_rows;
s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row;

// frag.num_rows = fragment_size except for the last fragment in partition which can be smaller.
// num_rows is fixed but fragment size could be larger if the data is strings or nested.
s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row);
s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
// For nested schemas, the number of values in a fragment is not directly related to the
// number of encoded data elements or the number of rows. It is simply the number of
// repetition/definition values which together encode validity and nesting information.
size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row];
size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows];
s->frag.num_values = last_level_val_idx - first_level_val_idx;
} else {
s->frag.num_values = s->frag.num_rows;
}
}

auto const leaf_type = s->col.leaf_column->type().id();
auto const dtype_len = physical_type_len(physical_type, leaf_type);
__syncthreads();

size_type nvals = s->frag.num_leaf_values;
size_type start_value_idx = s->frag.start_value_idx;

for (uint32_t i = 0; i < nvals; i += block_size) {
uint32_t val_idx = start_value_idx + i + t;
uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size())
? s->col.leaf_column->is_valid(val_idx)
: 0;
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
for (uint32_t frag_y = blockIdx.y; frag_y < num_fragments_per_column; frag_y += gridDim.y) {
if (t == 0) {
// Find which partition this fragment came from
auto it =
thrust::upper_bound(thrust::seq, part_frag_offset.begin(), part_frag_offset.end(), frag_y);
int p = it - part_frag_offset.begin() - 1;
int part_end_row = partitions[p].start_row + partitions[p].num_rows;
s->frag.start_row = (frag_y - part_frag_offset[p]) * fragment_size + partitions[p].start_row;

// frag.num_rows = fragment_size except for the last fragment in partition which can be
// smaller. num_rows is fixed but fragment size could be larger if the data is strings or
// nested.
s->frag.num_rows = min(fragment_size, part_end_row - s->frag.start_row);
s->frag.num_dict_vals = 0;
s->frag.fragment_data_size = 0;
s->frag.dict_data_size = 0;

s->frag.start_value_idx = row_to_value_idx(s->frag.start_row, s->col);
size_type end_value_idx = row_to_value_idx(s->frag.start_row + s->frag.num_rows, s->col);
s->frag.num_leaf_values = end_value_idx - s->frag.start_value_idx;

if (s->col.level_offsets != nullptr) {
// For nested schemas, the number of values in a fragment is not directly related to the
// number of encoded data elements or the number of rows. It is simply the number of
// repetition/definition values which together encode validity and nesting information.
size_type first_level_val_idx = s->col.level_offsets[s->frag.start_row];
size_type last_level_val_idx = s->col.level_offsets[s->frag.start_row + s->frag.num_rows];
s->frag.num_values = last_level_val_idx - first_level_val_idx;
} else {
s->frag.num_values = s->frag.num_rows;
}
} else {
len = 0;
}
__syncthreads();

size_type nvals = s->frag.num_leaf_values;
size_type start_value_idx = s->frag.start_value_idx;

len = block_reduce(reduce_storage).Sum(len);
if (!t) { s->frag.fragment_data_size += len; }
for (uint32_t i = 0; i < nvals; i += block_size) {
uint32_t val_idx = start_value_idx + i + t;
uint32_t is_valid = (i + t < nvals && val_idx < s->col.leaf_column->size())
? s->col.leaf_column->is_valid(val_idx)
: 0;
uint32_t len;
if (is_valid) {
len = dtype_len;
if (physical_type == BYTE_ARRAY) {
switch (leaf_type) {
case type_id::STRING: {
auto str = s->col.leaf_column->element<string_view>(val_idx);
len += str.size_bytes();
} break;
case type_id::LIST: {
auto list_element =
get_element<statistics::byte_array_view>(*s->col.leaf_column, val_idx);
len += list_element.size_bytes();
} break;
default: CUDF_UNREACHABLE("Unsupported data type for leaf column");
}
}
} else {
len = 0;
}

len = block_reduce(reduce_storage).Sum(len);
if (t == 0) { s->frag.fragment_data_size += len; }
__syncthreads();
}
__syncthreads();
if (t == 0) { frag[blockIdx.x][frag_y] = s->frag; }
}
__syncthreads();
if (t == 0) frag[blockIdx.x][blockIdx.y] = s->frag;
}

// blockDim {128,1,1}
Expand All @@ -200,21 +205,21 @@ __global__ void __launch_bounds__(128)
device_2dspan<PageFragment const> fragments,
device_span<parquet_column_device_view const> col_desc)
{
// TODO: why not 1 block per warp?
__shared__ __align__(8) statistics_group group_g[4];

uint32_t lane_id = threadIdx.x & 0x1f;
uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x >> 5);
uint32_t column_id = blockIdx.x;
auto num_fragments_per_column = fragments.size().second;
statistics_group* const g = &group_g[threadIdx.x >> 5];
if (!lane_id && frag_id < num_fragments_per_column) {
g->col = &col_desc[column_id];
g->start_row = fragments[column_id][frag_id].start_value_idx;
g->num_rows = fragments[column_id][frag_id].num_leaf_values;
uint32_t const lane_id = threadIdx.x & 0x1f;
Copy link
Contributor

@bdice bdice Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some rather innocuous-seeming magic values that all related to cudf::detail::warp_size in this function. I'll point them out, but I am fine with doing nothing if we feel the current code is better not to change.

Suggested change
uint32_t const lane_id = threadIdx.x & 0x1f;
uint32_t const lane_id = threadIdx.x % cudf::detail::warp_size;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good (although using the mod operator makes my teeth itch 🤣). Does anyone happen to know if there are constants anywhere for the max threadblock dimensions? Or are those per-card values?

Copy link
Contributor

@bdice bdice Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there are no constants for that, and that's why we defined cudf::detail::warp_size. It is a constant for all NVIDIA GPUs as far as I am aware.

These two snippets should compile out roughly the same. Compilers can recognize that unsigned modulo by $2^N$ is equivalent to bitwise-and with $2^N - 1$. Evidence: https://godbolt.org/z/r4c41va5P

Copy link
Contributor Author

@etseidl etseidl Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and added a constexpr for the warp mask (before I read your reply)...there are several other instances of 0x1f sprinkled about in this file that can be replaced later.

Thanks for the link @bdice! Should I get rid of my mask constexpr and just use cudf::detail::warp_size everywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be a bit more precise here, CUDA does provide warpSize, which is available inside device code, and the getDeviceProperties host function, which returns a struct containing the warp size. However, neither of them is a constant and therefore cannot be used in constant expressions (e.g. for declaring a C-style or std::array). The warp size is indeed constant across all current compute capabilities. In theory that's not something that we promise, so the technically correct answer is that we can't use a compile-time constant because in theory someone could run on a new architecture with a different answer. In practice, NVIDIA has no plans to change the warp size AFAIK and many examples of GPU code (even lots of code written by NVIDIA) define a warp_size constant. Lots of places use it assuming that it is in fact a compile-time constant and would have to be rewritten if we ever had any cards with a different warp size, so that's a much bigger problem to deal with another day if that ever changes :)

uint32_t const column_id = blockIdx.x;
uint32_t const num_fragments_per_column = fragments.size().second;

uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x >> 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x >> 5);
uint32_t frag_id = blockIdx.y * 4 + (threadIdx.x * cudf::detail::warp_size);

while (frag_id < num_fragments_per_column) {
if (lane_id == 0) {
statistics_group g;
g.col = &col_desc[column_id];
g.start_row = fragments[column_id][frag_id].start_value_idx;
g.num_rows = fragments[column_id][frag_id].num_leaf_values;
groups[column_id][frag_id] = g;
}
frag_id += gridDim.y * 4;
}
__syncthreads();
if (frag_id < num_fragments_per_column and lane_id == 0) groups[column_id][frag_id] = *g;
}

// blockDim {128,1,1}
Expand Down Expand Up @@ -2017,9 +2022,10 @@ void InitPageFragments(device_2dspan<PageFragment> frag,
uint32_t fragment_size,
rmm::cuda_stream_view stream)
{
auto num_columns = frag.size().first;
auto num_fragments_per_column = frag.size().second;
dim3 dim_grid(num_columns, num_fragments_per_column); // 1 threadblock per fragment
int const num_columns = frag.size().first;
int const num_fragments_per_column = frag.size().second;
auto const grid_y = std::min(num_fragments_per_column, (1 << 16) - 1);
vuule marked this conversation as resolved.
Show resolved Hide resolved
dim3 const dim_grid(num_columns, grid_y); // 1 threadblock per fragment
gpuInitPageFragments<512><<<dim_grid, 512, 0, stream.value()>>>(
frag, col_desc, partitions, part_frag_offset, fragment_size);
}
Expand All @@ -2031,8 +2037,10 @@ void InitFragmentStatistics(device_2dspan<statistics_group> groups,
{
int const num_columns = col_desc.size();
int const num_fragments_per_column = fragments.size().second;
auto grid_y = util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size);
dim3 dim_grid(num_columns, grid_y); // 1 warp per fragment
auto const y_dim =
util::div_rounding_up_safe(num_fragments_per_column, 128 / cudf::detail::warp_size);
auto const grid_y = std::min(y_dim, (1 << 16) - 1);
dim3 const dim_grid(num_columns, grid_y); // 1 warp per fragment
gpuInitFragmentStats<<<dim_grid, 128, 0, stream.value()>>>(groups, fragments, col_desc);
}

Expand Down
18 changes: 18 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,24 @@ TEST_F(ParquetWriterTest, HostBuffer)
cudf::test::expect_metadata_equal(expected_metadata, result.metadata);
}

TEST_F(ParquetWriterTest, ManyFragments)
{
srand(31337);
auto const expected = create_random_fixed_table<int>(10, 6'000'000, false);

auto const filepath = temp_env->get_temp_filepath("ManyFragments.parquet");
cudf::io::parquet_writer_options const args =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected)
.max_page_size_bytes(8 * 1024);
cudf::io::write_parquet(args);

cudf::io::parquet_reader_options const read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto const result = cudf::io::read_parquet(read_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(*result.tbl, *expected);
}

TEST_F(ParquetWriterTest, NonNullable)
{
srand(31337);
Expand Down