Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized fsspec data transfer for remote file-systems #9265

Merged
merged 29 commits into from
Sep 22, 2021

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Sep 21, 2021

This PR strips the pyarrow-NativeFile component out of #9225 (since those changes are not yet stable). I feel that it is reasonable to start by merging these fsspec-specific optimizations for 21.10, because they are stable and already result in a significant performance boost over the existing approach to remote storage. I still think it is very important that we eventually plumb NativeFile support into python (cudf and dask_cudf), but we will likely need to target 21.12 for that improvement.

@rjzamora rjzamora requested review from a team as code owners September 21, 2021 15:47
@rjzamora rjzamora self-assigned this Sep 21, 2021
@github-actions github-actions bot added the Python Affects Python cuDF API. label Sep 21, 2021
@rjzamora rjzamora added dask Dask issue improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Performance Performance related issue labels Sep 21, 2021
Copy link
Member Author

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some notes.

Comment on lines 63 to 67
iotypes=(BytesIO, StringIO),
byte_ranges=[byte_range] if byte_range else None,
clip_dummy_buffer=True if byte_range else False,
**kwargs,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes:

  • When byte_range is specified in a read_csv call, we only need to transfer that byte range from remote storage.
  • By default, the byte-ranges that are transferred from remote storage will be coppied into a local dummy buffer. We call this a dummy buffer, because it is likely to contain many "empty" bytes that libcudf will ultimately ignore
  • We use clip_dummy_buffer=True to avoid the actual allocation of "empty" bytes when byte_range= is specified. This option informs get_filepath_or_buffer that the local dummy buffer does not need to be the size of the entire remote file, and can be clipped down to the exact byte_range size.
  • When clip_dummy_buffer=True, the byte_range argument passed down to libcudf must be adjusted to a zero offset (see code block below).
  • The clip_dummy_buffer=True optimization cannot be used for parquet (for now), because the footer metadata includes many specific column-chunk offesets.

Comment on lines 69 to 73
# Adjust byte_range for clipped dummy buffers
use_byte_range = byte_range
if byte_range and isinstance(filepath_or_buffer, BytesIO):
if byte_range[1] == filepath_or_buffer.getbuffer().nbytes:
use_byte_range = (0, byte_range[1])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed above, this is where we reset byte_range to a zero offset when we are using a local dummy buffer.

Comment on lines 165 to 205
def _process_row_groups(paths, fs, filters=None, row_groups=None):

# Deal with case that the user passed in a directory name
file_list = paths
if len(paths) == 1 and ioutils.is_directory(paths[0]):
paths = ioutils.stringify_pathlike(paths[0])

# Convert filters to ds.Expression
if filters is not None:
filters = pq._filters_to_expression(filters)

# Initialize ds.FilesystemDataset
dataset = ds.dataset(
paths, filesystem=fs, format="parquet", partitioning="hive",
)
file_list = dataset.files
if len(file_list) == 0:
raise FileNotFoundError(f"{paths} could not be resolved to any files")

if filters is not None:
# Load IDs of filtered row groups for each file in dataset
filtered_rg_ids = defaultdict(list)
for fragment in dataset.get_fragments(filter=filters):
for rg_fragment in fragment.split_by_row_group(filters):
for rg_info in rg_fragment.row_groups:
filtered_rg_ids[rg_fragment.path].append(rg_info.id)

# Initialize row_groups to be selected
if row_groups is None:
row_groups = [None for _ in dataset.files]

# Store IDs of selected row groups for each file
for i, file in enumerate(dataset.files):
if row_groups[i] is None:
row_groups[i] = filtered_rg_ids[file]
else:
row_groups[i] = filter(
lambda id: id in row_groups[i], filtered_rg_ids[file]
)

return file_list, row_groups
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR copies the existing filtering logic into a dedicated helper function. The general purpos of this function is to (1) expand directory input into a list of paths (using the pyarrow dataset API), and (2) to apply row-group filters.

Main take-away: This is just a re-organization of existing logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general purpose of this function is to (1) expand directory input into a list of paths (using the pyarrow dataset API), and (2) to apply row-group filters.

Perhaps this could be a helpful comment to add at the top of this function?

Comment on lines 208 to 269
def _get_byte_ranges(file_list, row_groups, columns, fs):

if row_groups is None:
if columns is None:
return None, None, None # No reason to construct this
row_groups = [None for path in file_list]

# Construct a list of required byte-ranges for every file
all_byte_ranges, all_footers, all_sizes = [], [], []
for path, rgs in zip(file_list, row_groups):

# Step 0 - Get size of file
if fs is None:
file_size = path.size
else:
file_size = fs.size(path)

# Step 1 - Get 32 KB from tail of file.
#
# This "sample size" can be tunable, but should
# always be >= 8 bytes (so we can read the footer size)
tail_size = min(32_000, file_size)
if fs is None:
path.seek(file_size - tail_size)
footer_sample = path.read(tail_size)
else:
footer_sample = fs.tail(path, tail_size)

# Step 2 - Read the footer size and re-read a larger
# tail if necessary
footer_size = int.from_bytes(footer_sample[-8:-4], "little")
if tail_size < (footer_size + 8):
if fs is None:
path.seek(file_size - (footer_size + 8))
footer_sample = path.read(footer_size + 8)
else:
footer_sample = fs.tail(path, footer_size + 8)

# Step 3 - Collect required byte ranges
byte_ranges = []
md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata
for r in range(md.num_row_groups):
# Skip this row-group if we are targetting
# specific row-groups
if rgs is None or r in rgs:
row_group = md.row_group(r)
for c in range(row_group.num_columns):
column = row_group.column(c)
name = column.path_in_schema
# Skip this column if we are targetting a
# specific columns
if columns is None or name in columns:
file_offset0 = column.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.data_page_offset
num_bytes = column.total_uncompressed_size
byte_ranges.append((file_offset0, num_bytes))

all_byte_ranges.append(byte_ranges)
all_footers.append(footer_sample)
all_sizes.append(file_size)
return all_byte_ranges, all_footers, all_sizes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _get_byte_ranges utility corresponds to new logic. This is where we collect a footer-metadata sample from each parquet file, and then use that metadata to define the exact byte-ranges that will be needed to read the target column-chunks from the file. This utility is only used for remote storage. The calculated byte-range information is used within cudf.io.ioutils.get_filepath_or_buffer (which uses _fsspec_data_transfer to convert non-local fsspec file objects into local byte buffers).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto as above -- this is super helpful and could perhaps be useful in the code as a comment!

@codecov
Copy link

codecov bot commented Sep 21, 2021

Codecov Report

Merging #9265 (13c8f5b) into branch-21.10 (3ee3ecf) will increase coverage by 0.01%.
The diff coverage is 11.96%.

Impacted file tree graph

@@               Coverage Diff                @@
##           branch-21.10    #9265      +/-   ##
================================================
+ Coverage         10.85%   10.87%   +0.01%     
================================================
  Files               115      116       +1     
  Lines             19158    19328     +170     
================================================
+ Hits               2080     2102      +22     
- Misses            17078    17226     +148     
Impacted Files Coverage Δ
python/cudf/cudf/__init__.py 0.00% <ø> (ø)
python/cudf/cudf/_lib/__init__.py 0.00% <ø> (ø)
python/cudf/cudf/io/__init__.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/csv.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/parquet.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/text.py 0.00% <0.00%> (ø)
python/cudf/cudf/utils/ioutils.py 0.00% <0.00%> (ø)
python/dask_cudf/dask_cudf/io/parquet.py 89.61% <89.28%> (+0.34%) ⬆️
python/dask_cudf/dask_cudf/accessors.py 89.74% <0.00%> (-1.93%) ⬇️
python/cudf/cudf/utils/dtypes.py 0.00% <0.00%> (ø)
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f08d6f1...13c8f5b. Read the comment docs.

python/cudf/cudf/io/csv.py Outdated Show resolved Hide resolved

# Convert filters to ds.Expression
if filters is not None:
filters = pq._filters_to_expression(filters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we OK using a non-public arrow API here? Is there a public alternative, or is it worth vendoring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! This same issue came up in Dask a few times (where we are also using this "private" utility).
@jorisvandenbossche - Is this still the recommended way to translate filters?

Note that this particular line was not actually added in this PR, but it would be good to change it if there is a new "public" API.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although private, it's knowingly being used in several places (such as dask), so we won't be removing it without alternative / deprecation, as with public API. So I think it is fine to use.
There is an issue about making this public (https://issues.apache.org/jira/browse/ARROW-9672), I can look into that for the coming release, but you will still need to keep using the private one for some time if you want to support older versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the confirmation Joris. I'll keep my eye on that issue in case something changes. Note that the most ideal solution from the perspective of Dask and RAPIDS is for pyarrow to start recognizing filters specified as the DNF-like list of tuples :)

Copy link
Contributor

@shwina shwina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! A couple of minor changes requested.

@rjzamora rjzamora added the 5 - Ready to Merge Testing and reviews complete, ready to merge label Sep 22, 2021
@rjzamora
Copy link
Member Author

@gpucibot merge

@rjzamora
Copy link
Member Author

rerun tests

@rapids-bot rapids-bot bot merged commit 8dea0b1 into rapidsai:branch-21.10 Sep 22, 2021
@rjzamora rjzamora deleted the fsspec-optimized-transfer branch September 23, 2021 13:26
rapids-bot bot pushed a commit that referenced this pull request Oct 6, 2021
This is a simple follow-up to #9304 and #9265 meant to achieve the following:

- After this PR, the default behavior of `cudf.read_csv` will be to convert fsspec-based `AbstractBufferedFile` objects to Arrow `PythonFile` objects for non-local file systems. Since `PythonFile` objects inherit from `NativeFile` objects, libcudf can seek/read distinct byte ranges without requiring the entire file to be read into host memory (i.e. the default behavior enables proper partial IO from remote storage)

- #9265 recently added an fsspec-based optimization for transfering csv byte ranges into local memory. That optimization already allowed us to avoid a full file transfer when a specific `byte_range` is specified to the `cudf.read_csv` call.  However, the simpler approach introduced in this PR is (1) more general, (2) easier to maintain, and (3) demonstrates comparable performance. Therefore, this PR also rolls back one of the less-maintainable optimizations added in #9265 (local buffer clipping).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - https://github.com/brandon-b-miller

URL: #9376
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge dask Dask issue improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Performance Performance related issue Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants