Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor read_metadata in fastparquet engine #8092

Merged
merged 60 commits into from
Oct 19, 2021

Conversation

rjzamora
Copy link
Member

This is a follow-up to #8072, and corresponds to the "short-term" FastParquetEngine component of the plan discussed in #8058 .

Note that #8072 should be merged first.

@pentschev
Copy link
Member

GPU issues should be fixed now that rapidsai/cudf#9118 is in. Rerunning tests.

@rjzamora rjzamora marked this pull request as ready for review October 1, 2021 22:24
@rjzamora rjzamora marked this pull request as draft October 1, 2021 22:24
@rjzamora rjzamora marked this pull request as ready for review October 4, 2021 13:21
@martindurant
Copy link
Member

Is this ready for review?

@rjzamora
Copy link
Member Author

rjzamora commented Oct 7, 2021

Is this ready for review?

Yes - I will take another look through it now, but you should feel free to review whenever you can find the time :D

@rjzamora rjzamora mentioned this pull request Oct 7, 2021
3 tasks
Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've gone through most of it, and don't see much awry. There is a lot, though! Probably you can push ahead and note some for follow-ups, such as profiling whether the remaining work in the client is expensive or not.

dask/dataframe/io/parquet/fastparquet.py Show resolved Hide resolved
dask/dataframe/io/parquet/fastparquet.py Outdated Show resolved Hide resolved
# Find all files if we are not using a _metadata file
if ignore_metadata_file or not _metadata_exists:
# For now, we need to discover every file under paths[0]
paths, base, fns = _sort_and_analyze_paths(fs.find(base), fs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these path operations should be profiles, I wouldn't be surprised if it can add up to a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree - We are not really "adding" anything new in this PR, but there are probably ways to reduce fs overhead.

gather_statistics = True
else:
# Use 0th file
# Note that "_common_metadata" can cause issues for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment, I thought we had covered this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to use the _common_metadata file to generate the ParquetFile object, but the code was not working for partitioned data (since the file name is used). This small change allows us to handle partitioned data when _metadata is missing.

_metadata_exists = "_metadata" in fns
if _metadata_exists and ignore_metadata_file:
fns.remove("_metadata")
paths = [fs.sep.join([base, fn]) for fn in fns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just extracted the base from the filenames and then here we add them back in again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - We keep track of base and use it elsewhere after this. We are not actually "adding them back in." We are only executing this last line to remove _metadata from paths. Would you prefer paths = [p for p in paths if p.endswith("_metadata")] ?

if getattr(dtypes.get(ind), "numpy_dtype", None):
# index does not support masked types
dtypes[ind] = dtypes[ind].numpy_dtype
meta = _meta_from_dtypes(all_columns, dtypes, index_cols, column_index_names)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that fastparquet can do this too, via the preallocate stuff (just make one row's worth). It may well end up with a closer representation.

if filters:
# Filters may require us to gather statistics
if gather_statistics is False and pf.info.get("partitions", None):
warnings.warn(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like an error if pass a column we can't actually filter on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is not added in this PR, but I agree an error may make more sense than a warning. However, since there is at least a warning, can we leave the "fix" for a follow-up?

common_kwargs,
)

dataset_info_kwargs = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly copied from dataset_info?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - We don't want to pass all of dataset_info to each task in the metadata-processing graph, so we specify the required elements here.

or metadata_task_size > len(paths)
):
# Use original `ParquetFile` object to construct plan,
# since it is based on a global _metadata file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is outdated? Basically the "old" method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can clarify the comment, but it is not really outdated (just incomplete). This code path means we have a _metadata file or the metadata_task_size setting has caused parallel metadata processing to be disabled.

):

# Collect necessary information from dataset_info
fs = dataset_info_kwargs["fs"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to get around this repeat bundling and unbundling? Seems like unnecessary operations and unnecessary code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, probably. To be honest, 90% of this Pr is just moving existing code around (very little "new" logic). One of the only "new" changes is the general usage of a dataset_info dict. I was expecting a bit of pushback on this. However, I decided that explcitly packing and unpacking a dictionary this way makes it much easier to avoid breaking changes to read_metadata.

My preference is to follow this approach for now (especially since the pyarrow version uses the same logic), and onlhy make the API more rigid once we are more confident that we are passing along exactly what we need.

@rjzamora
Copy link
Member Author

@martindurant - Thank you for the thorough review here! It was very useful. I do want to make a general note that many of your comments/suggestions are focused on code that was simply moved in this PR (not actually added). I am certainly happy that you have pointed out possible problems and improvements within some of the relocated logic. Are you okay with me targeting most of these problems/improvements in follow-up PRs (to keep the changes here as minimal as possible)?

@martindurant
Copy link
Member

Are you okay with me targeting most of these problems/improvements in follow-up PRs

Certainly! Basically, the diff version was too hard for me to follow, so I read through the complete code, in the parts that seemed relevant. Things that might have been suboptimal and continue being exactly the same amount of suboptimal don't worry me too much :)

@rjzamora
Copy link
Member Author

Basically, the diff version was too hard for me to follow

I don't think this PR is even possible to review from the diff :) So, I really appreciate that you took the time to look through everything!

@rjzamora
Copy link
Member Author

I feel that this PR should be merged within the next day or so. Please feel free to speak up if you feel that any of the issues that were discussed in code review (or others that were not discussed) need to be addressed in this particular PR (cc @martindurant @jrbourbeau)

@martindurant
Copy link
Member

I don't intend to review further, so w can merge and iterate as usual.

@rjzamora rjzamora merged commit 07ee3b8 into dask:main Oct 19, 2021
@rjzamora rjzamora deleted the read-metadata-refactor-fastparquet branch October 19, 2021 22:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants