Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row-group-level partitioning for Parquet #9849

Closed
wants to merge 11 commits into from

Conversation

calebwin
Copy link
Contributor

@calebwin calebwin commented Dec 6, 2021

This PR introduces a row_group_cols parameter for cudf.to_parquet that groups data by the given columns are writes separate columns to separate row groups (row groups are groups of rows within a Parquet file). This is similar to partition_cols except instead of separate groups being written to separate files, separate groups are written to separate row groups within a file.

You should use row_group_cols when you want to partition data on a column but there are too many groups (or combinations of groups if you are partitioning on a secondary column) that would result in too many small files.

What's left:

  • Initial implementation in cudf
  • Integration in dask_cudf
  • Testing, tests
  • Benchmarking

Notes to reviewers:

  • There are changes in cudf-cpp, cudf-python, dask-cudf-python.
  • If dask_cudf.to_parquet accepts partition_on and row_group_cols, should row_group_cols be renamed to row_group_on for API consistency?
  • Is the way I am launching the populate_chunk_hash_maps and get_dictionary_indices kernels correct?
  • There is an extra index column written when specifying row_group_cols - is this a bug we should fix before merging? It seems to be related to [1] and [2].

[1] https://issues.apache.org/jira/browse/ARROW-9136
[2] pandas-dev/pandas#34790.

@github-actions github-actions bot added Python Affects Python cuDF API. libcudf Affects libcudf (C++/CUDA) code. labels Dec 6, 2021
@calebwin calebwin marked this pull request as ready for review December 10, 2021 04:21
@calebwin calebwin requested review from a team as code owners December 10, 2021 04:21
@calebwin calebwin requested review from vyasr, vuule and isVoid December 10, 2021 04:21
@jakirkham
Copy link
Member

cc @rjzamora

@devavret
Copy link
Contributor

  • Is the way I am launching the populate_chunk_hash_maps and get_dictionary_indices kernels correct?

Yes. in fact that exactly how I did it in this partitioning PR #9810

dim3 const dim_grid(frags.size().second, frags.size().first);

Copy link
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice feature @calebwin! I have some comments below. I'm not deeply familiar with this code so let me know if anything I suggested seems off base.

Comment on lines +105 to +108
auto start_row = 0;
for (auto i = 0; i < block_x; i++) {
start_row += fragments[0][i].num_rows;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps these offsets should be pre-computed with a scan and then passed into the kernel? I'm not sure how many row groups we expect. The difference between 10 and 1M would indicate whether this should be a host or device computation.

If we shouldn't use a scan and pass in the precomputed offsets, then this could use std::accumulate. Might look something like this snippet (untested):

Suggested change
auto start_row = 0;
for (auto i = 0; i < block_x; i++) {
start_row += fragments[0][i].num_rows;
}
auto row_counter = thrust::transform_iterator(fragments[0], [] __device__(auto const& page){ return page.num_rows; });
auto start_row = std::accumulate(row_counter[0], row_counter[block_x], 0);

(Note: page might not be the right name for the function argument, I am just guessing from device_2dspan<PageFragment>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like this:

size_type start_row = frag.start_row;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devavret Looks about right! I'm just trying to avoid a loop on each thread when we could use a single-pass scan ahead of time. I see you've worked on this in #9810. That logic should be used here. Does #9810 need to be merged first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does #9810 need to be merged first?

Actually, that's what I was wondering just now. #9810 is close to completion and if it is merged first, then there will be many merge issues with this PR. I'm fine with merging #9810 later or taking over this one if it remains unmerged due to merge issues.

cc @quasiben @vuule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devavret do you have an ETA for addressing current comments on #9810?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My 2cts:
Let's aim to merge this first so Caleb has a chance to get the PR as close as possible to the finish line as possible. If 9810 already addresses some comments here, maybe those pieces can be applied to this PR (also reduces merge conflicts).

I'm not sure what's the best approach, inclined to leave the decision up to @devavret and @calebwin .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @bdice @hyperbolic2346 @devavret @vuule for reviews and comments. I just ran into a subtle CUDA bug in this PR when I was in the middle of writing a benchmark. I looked through #9810 and it looks like there are changes in the CUDA code that may handle edge cases I didn't consider here.

So I'm going to go ahead and try to merge #9810 into this and make appropriate changes. I will see if that resolves the issue I came across when benchmarking. I will then try to address other reviews here.

Should I convert this PR to draft in the meanwhile?

Comment on lines +257 to +260
auto start_row = 0;
for (auto i = 0; i < block_x; i++) {
start_row += fragments[0][i].num_rows;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comment.

@@ -20,6 +20,7 @@
*/

#include <io/statistics/column_statistics.cuh>
#include "io/parquet/parquet_gpu.hpp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use <> braces.

Suggested change
#include "io/parquet/parquet_gpu.hpp"
#include <io/parquet/parquet_gpu.hpp>


#include <iostream>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was left in from some print debugging? If it is needed, it should go with the other section of stdlib headers like #include <algorithm> below here, rather than with the rmm includes.

cudf::detail::hostdevice_2dvector<gpu::PageFragment> fragments(
num_columns, num_fragments, stream);

if (row_group_sizes_specified) {
// auto fragments_span = host_2dspan<gpu::PageFragment>{fragments};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this commented?

Comment on lines +133 to +138
write_df.to_parquet(
fil,
index=preserve_index,
row_group_cols=row_group_cols,
**kwargs,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this section of code can be written to only call to_parquet once. Something roughly like this, which updates **kwargs and optionally keeps the result:

if return_metadata:
    kwargs["metadata_file_path"] = fs.sep.join([subdir, filename])

metadata_result = write_df.to_parquet(
    fil,
    index=preserve_index,
    row_group_cols=row_group_cols,
    **kwargs,
)

if return_metadata:
    metadata.append(metadata_result)

Comment on lines +448 to +449
for a, b in zip(col_names, df.columns):
assert a == b
Copy link
Contributor

@bdice bdice Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent?

Suggested change
for a, b in zip(col_names, df.columns):
assert a == b
assert col_names == df.columns

Aside from being shorter, it is preferable to compare all the column names at once because it produces a nicer error message if it fails.

num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname)

assert num_rows == len(df.index)
assert row_groups == len(row_group_sizes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line verifies the number of row groups, but I don't think this test is checking the number of rows in each row group. That seems important to test here.

Comment on lines +219 to +221
Column names by which to partition the dataset across row groups in the
resulting Parquet file
Columns are partitioned in the order they are given
Copy link
Contributor

@bdice bdice Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentences in docstrings should end in a period. Line breaks should be avoided except to wrap at the column limit.

Suggested change
Column names by which to partition the dataset across row groups in the
resulting Parquet file
Columns are partitioned in the order they are given
Column names by which to partition the dataset across row groups in the
resulting Parquet file. Columns are partitioned in the order they are
given.

tmpdir, partition_on=partition_on, row_group_cols=row_group_cols
)
ddf_read = dask_cudf.read_parquet(tmpdir)
assert_eq(len(ddf), len(ddf_read))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_eq is intended for more complicated assertions about dataframes being equivalent. Comparing lengths should be done with a plain assert. However, it is probaby a good idea to make sure the data frame written/read is equivalent to the source dataframe in memory:

Suggested change
assert_eq(len(ddf), len(ddf_read))
assert len(ddf) == len(ddf_read)
assert_eq(ddf, ddf_read)

Copy link
Contributor

@hyperbolic2346 hyperbolic2346 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. Some comments. Interested to see this progress.

Comment on lines +133 to +137
if (fragment_size != -1) {
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows));
} else {
s->frag.num_rows = frag[blockIdx.x][blockIdx.y].num_rows;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (fragment_size != -1) {
s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows));
} else {
s->frag.num_rows = frag[blockIdx.x][blockIdx.y].num_rows;
}
s->frag.num_rows = fragment_size != -1 ? min(fragment_size, max_num_rows - min(start_row, max_num_rows)) : frag[blockIdx.x][blockIdx.y].num_rows;

cpp/src/io/parquet/page_enc.cu Show resolved Hide resolved
if (row_group_sizes_specified) {
num_fragments = 0;
for (std::size_t i = 0; i < row_group_sizes.size(); i++) {
num_fragments += (row_group_sizes[i] + max_page_fragment_size - 1) / max_page_fragment_size;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a std::accumulate as Bradley showed above.

@github-actions
Copy link

github-actions bot commented Jan 9, 2022

This PR has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this PR if it is no longer required. Otherwise, please respond with a comment indicating any updates. This PR will be labeled inactive-90d if there is no activity in the next 60 days.

@shwina shwina changed the base branch from branch-22.02 to branch-22.04 January 20, 2022 21:15
@github-actions
Copy link

github-actions bot commented Jun 8, 2022

This PR has been labeled inactive-90d due to no recent activity in the past 90 days. Please close this PR if it is no longer required. Otherwise, please respond with a comment indicating any updates.

@vyasr
Copy link
Contributor

vyasr commented Jan 23, 2024

I'm going to close this since it's fairly out of date and it's not clear if we still want this as is. Feel free to reopen if work on this restarts.

@vyasr vyasr closed this Jan 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants