Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized fsspec data transfer for remote file-systems #9265

Merged
merged 29 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82ab31d
save work related to byte-range collection
rjzamora Sep 9, 2021
ef02f3d
enable byte_ranges optimization for open file-like
rjzamora Sep 10, 2021
a32a7ae
fix bug for no column or row-group selection
rjzamora Sep 10, 2021
26b96f3
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 10, 2021
bd2e59a
add arrow_filesystem flag for dask_cudf
rjzamora Sep 10, 2021
53bb32e
use cat_ranges when available
rjzamora Sep 10, 2021
ada8451
expose arrow_filesystem and legacy_transfer
rjzamora Sep 11, 2021
661ae58
most tests passing with reasonable defaults - arrow_filesystem=True n…
rjzamora Sep 13, 2021
42c55c9
fix bug
rjzamora Sep 13, 2021
51021ab
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 13, 2021
ddec4df
legacy_transfer fix
rjzamora Sep 15, 2021
36e4c52
fix test failures
rjzamora Sep 16, 2021
98efb58
plumb in csv support since most of the work is already done
rjzamora Sep 16, 2021
63dd615
remove unncessary BytesIO usage for optimized code path
rjzamora Sep 16, 2021
40639c2
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 20, 2021
5524538
avoid memory leaks in _read_byte_ranges
rjzamora Sep 20, 2021
fd2998a
avoid full-file transfer for read_csv with byte_range defined
rjzamora Sep 20, 2021
d1cb7a6
avoid seeking before beginning of file
rjzamora Sep 20, 2021
491c69f
remove arrow_filesystem option from dask (for now)
rjzamora Sep 21, 2021
5994fd9
save state
rjzamora Sep 21, 2021
d757715
strip nativefile changes from nativefile-parquet branch
rjzamora Sep 21, 2021
63a3bc9
fix byte_range bug in read_csv
rjzamora Sep 21, 2021
a1f44c5
update test_s3 to exercise fsspec transfer optimization
rjzamora Sep 21, 2021
cb78ba8
add parquet filter test with s3
rjzamora Sep 21, 2021
949036c
use bytes again by default
rjzamora Sep 21, 2021
8409e35
use BytesIO to be safe
rjzamora Sep 21, 2021
7f9154b
fix another copy-paste mistake
rjzamora Sep 21, 2021
e86f79b
change dummy to local
rjzamora Sep 22, 2021
13c8f5b
add comments
rjzamora Sep 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,17 @@ def read_csv(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO),
byte_ranges=[byte_range] if byte_range else None,
clip_dummy_buffer=True if byte_range else False,
**kwargs,
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes:

  • When byte_range is specified in a read_csv call, we only need to transfer that byte range from remote storage.
  • By default, the byte-ranges that are transferred from remote storage will be coppied into a local dummy buffer. We call this a dummy buffer, because it is likely to contain many "empty" bytes that libcudf will ultimately ignore
  • We use clip_dummy_buffer=True to avoid the actual allocation of "empty" bytes when byte_range= is specified. This option informs get_filepath_or_buffer that the local dummy buffer does not need to be the size of the entire remote file, and can be clipped down to the exact byte_range size.
  • When clip_dummy_buffer=True, the byte_range argument passed down to libcudf must be adjusted to a zero offset (see code block below).
  • The clip_dummy_buffer=True optimization cannot be used for parquet (for now), because the footer metadata includes many specific column-chunk offesets.


# Adjust byte_range for clipped dummy buffers
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
use_byte_range = byte_range
if byte_range and isinstance(filepath_or_buffer, BytesIO):
if byte_range[1] == filepath_or_buffer.getbuffer().nbytes:
use_byte_range = (0, byte_range[1])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed above, this is where we reset byte_range to a zero offset when we are using a local dummy buffer.


if na_values is not None and is_scalar(na_values):
na_values = [na_values]

Expand Down Expand Up @@ -91,7 +99,7 @@ def read_csv(
true_values=true_values,
false_values=false_values,
nrows=nrows,
byte_range=byte_range,
byte_range=use_byte_range,
skip_blank_lines=skip_blank_lines,
parse_dates=parse_dates,
comment=comment,
Expand Down
200 changes: 162 additions & 38 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.

import io
import warnings
from collections import defaultdict
from uuid import uuid4

import fsspec
from pyarrow import dataset as ds, parquet as pq

import cudf
Expand Down Expand Up @@ -160,6 +162,113 @@ def read_parquet_metadata(path):
return num_rows, num_row_groups, col_names


def _process_row_groups(paths, fs, filters=None, row_groups=None):

# Deal with case that the user passed in a directory name
file_list = paths
if len(paths) == 1 and ioutils.is_directory(paths[0]):
paths = ioutils.stringify_pathlike(paths[0])

# Convert filters to ds.Expression
if filters is not None:
filters = pq._filters_to_expression(filters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we OK using a non-public arrow API here? Is there a public alternative, or is it worth vendoring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! This same issue came up in Dask a few times (where we are also using this "private" utility).
@jorisvandenbossche - Is this still the recommended way to translate filters?

Note that this particular line was not actually added in this PR, but it would be good to change it if there is a new "public" API.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although private, it's knowingly being used in several places (such as dask), so we won't be removing it without alternative / deprecation, as with public API. So I think it is fine to use.
There is an issue about making this public (https://issues.apache.org/jira/browse/ARROW-9672), I can look into that for the coming release, but you will still need to keep using the private one for some time if you want to support older versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the confirmation Joris. I'll keep my eye on that issue in case something changes. Note that the most ideal solution from the perspective of Dask and RAPIDS is for pyarrow to start recognizing filters specified as the DNF-like list of tuples :)


# Initialize ds.FilesystemDataset
dataset = ds.dataset(
paths, filesystem=fs, format="parquet", partitioning="hive",
)
file_list = dataset.files
if len(file_list) == 0:
raise FileNotFoundError(f"{paths} could not be resolved to any files")

if filters is not None:
# Load IDs of filtered row groups for each file in dataset
filtered_rg_ids = defaultdict(list)
for fragment in dataset.get_fragments(filter=filters):
for rg_fragment in fragment.split_by_row_group(filters):
for rg_info in rg_fragment.row_groups:
filtered_rg_ids[rg_fragment.path].append(rg_info.id)

# Initialize row_groups to be selected
if row_groups is None:
row_groups = [None for _ in dataset.files]

# Store IDs of selected row groups for each file
for i, file in enumerate(dataset.files):
if row_groups[i] is None:
row_groups[i] = filtered_rg_ids[file]
else:
row_groups[i] = filter(
lambda id: id in row_groups[i], filtered_rg_ids[file]
)

return file_list, row_groups
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR copies the existing filtering logic into a dedicated helper function. The general purpos of this function is to (1) expand directory input into a list of paths (using the pyarrow dataset API), and (2) to apply row-group filters.

Main take-away: This is just a re-organization of existing logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general purpose of this function is to (1) expand directory input into a list of paths (using the pyarrow dataset API), and (2) to apply row-group filters.

Perhaps this could be a helpful comment to add at the top of this function?



def _get_byte_ranges(file_list, row_groups, columns, fs):

if row_groups is None:
if columns is None:
return None, None, None # No reason to construct this
row_groups = [None for path in file_list]

# Construct a list of required byte-ranges for every file
all_byte_ranges, all_footers, all_sizes = [], [], []
for path, rgs in zip(file_list, row_groups):

# Step 0 - Get size of file
if fs is None:
file_size = path.size
else:
file_size = fs.size(path)

# Step 1 - Get 32 KB from tail of file.
#
# This "sample size" can be tunable, but should
# always be >= 8 bytes (so we can read the footer size)
tail_size = min(32_000, file_size)
if fs is None:
path.seek(file_size - tail_size)
footer_sample = path.read(tail_size)
else:
footer_sample = fs.tail(path, tail_size)

# Step 2 - Read the footer size and re-read a larger
# tail if necessary
footer_size = int.from_bytes(footer_sample[-8:-4], "little")
if tail_size < (footer_size + 8):
if fs is None:
path.seek(file_size - (footer_size + 8))
footer_sample = path.read(footer_size + 8)
else:
footer_sample = fs.tail(path, footer_size + 8)

# Step 3 - Collect required byte ranges
byte_ranges = []
md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata
for r in range(md.num_row_groups):
# Skip this row-group if we are targetting
# specific row-groups
if rgs is None or r in rgs:
row_group = md.row_group(r)
for c in range(row_group.num_columns):
column = row_group.column(c)
name = column.path_in_schema
# Skip this column if we are targetting a
# specific columns
if columns is None or name in columns:
file_offset0 = column.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.data_page_offset
num_bytes = column.total_uncompressed_size
byte_ranges.append((file_offset0, num_bytes))

all_byte_ranges.append(byte_ranges)
all_footers.append(footer_sample)
all_sizes.append(file_size)
return all_byte_ranges, all_footers, all_sizes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _get_byte_ranges utility corresponds to new logic. This is where we collect a footer-metadata sample from each parquet file, and then use that metadata to define the exact byte-ranges that will be needed to read the target column-chunks from the file. This utility is only used for remote storage. The calculated byte-range information is used within cudf.io.ioutils.get_filepath_or_buffer (which uses _fsspec_data_transfer to convert non-local fsspec file objects into local byte buffers).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto as above -- this is super helpful and could perhaps be useful in the code as a comment!



@ioutils.doc_read_parquet()
def read_parquet(
filepath_or_buffer,
Expand Down Expand Up @@ -189,18 +298,66 @@ def read_parquet(
elif not is_list_like(row_groups[0]):
row_groups = [row_groups]

# Check columns input
if columns is not None:
if not is_list_like(columns):
raise ValueError("Expected list like for columns")

# Start by trying construct a filesystem object, so we
# can apply filters on remote file-systems
fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs)
filepath_or_buffer = paths if paths else filepath_or_buffer
if fs is None and filters is not None:
raise ValueError("cudf cannot apply filters to open file objects.")

# Apply filters now (before converting non-local paths to buffers).
# Note that `_process_row_groups` will also expand `filepath_or_buffer`
# into a full list of files if it is a directory.
if fs is not None:
filepath_or_buffer, row_groups = _process_row_groups(
filepath_or_buffer, fs, filters=filters, row_groups=row_groups,
)

# Check if we should calculate the specific byte-ranges
# needed for each parquet file. We always do this when we
# have a file-system object to work with and it is not a
# local filesystem object. We can also do it without a
# file-system object for `AbstractBufferedFile` buffers
byte_ranges, footers, file_sizes = None, None, None
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(fs)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs,
)

filepaths_or_buffers = []
for source in filepath_or_buffer:
for i, source in enumerate(filepath_or_buffer):

if ioutils.is_directory(source, **kwargs):
fs = ioutils._ensure_filesystem(
passed_filesystem=None, path=source
# Note: For now, we know `fs` is an fsspec filesystem
# object, but it may be an arrow object in the future
fsspec_fs = ioutils._ensure_filesystem(
passed_filesystem=fs, path=source
)
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.parquet"])
source = fsspec_fs.sep.join([source, "*.parquet"])

tmp_source, compression = ioutils.get_filepath_or_buffer(
path_or_data=source, compression=None, **kwargs,
path_or_data=source,
compression=None,
fs=fs,
byte_ranges=byte_ranges[i] if byte_ranges else None,
footer=footers[i] if footers else None,
file_size=file_sizes[i] if file_sizes else None,
add_par1_magic=True,
**kwargs,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
Expand All @@ -210,39 +367,6 @@ def read_parquet(
else:
filepaths_or_buffers.append(tmp_source)

if columns is not None:
if not is_list_like(columns):
raise ValueError("Expected list like for columns")

if filters is not None:
# Convert filters to ds.Expression
filters = pq._filters_to_expression(filters)

# Initialize ds.FilesystemDataset
dataset = ds.dataset(
filepaths_or_buffers, format="parquet", partitioning="hive"
)

# Load IDs of filtered row groups for each file in dataset
filtered_rg_ids = defaultdict(list)
for fragment in dataset.get_fragments(filter=filters):
for rg_fragment in fragment.split_by_row_group(filters):
for rg_info in rg_fragment.row_groups:
filtered_rg_ids[rg_fragment.path].append(rg_info.id)

# Initialize row_groups to be selected
if row_groups is None:
row_groups = [None for _ in dataset.files]

# Store IDs of selected row groups for each file
for i, file in enumerate(dataset.files):
if row_groups[i] is None:
row_groups[i] = filtered_rg_ids[file]
else:
row_groups[i] = filter(
lambda id: id in row_groups[i], filtered_rg_ids[file]
)

if engine == "cudf":
return libparquet.read_parquet(
filepaths_or_buffers,
Expand Down
6 changes: 5 additions & 1 deletion python/cudf/cudf/tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ def test_read_csv(pdf, monkeypatch):
fpath = TEST_BUCKET + "test_csv_reader.csv"
buffer = pdf.to_csv(index=False)

def mock_open(*args):
def mock_open(*args, **kwargs):
return io.BytesIO(buffer.encode())

def mock_size(*args):
return len(buffer.encode())

monkeypatch.setattr(gcsfs.core.GCSFileSystem, "open", mock_open)
monkeypatch.setattr(gcsfs.core.GCSFileSystem, "size", mock_size)
got = cudf.read_csv("gcs://{}".format(fpath))

assert_eq(pdf, got)
Expand Down
56 changes: 51 additions & 5 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,41 @@ def pdf(scope="module"):
return df


def test_read_csv(s3_base, s3so, pdf):
@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
fname = "test_csv_reader.csv"
bname = "csv"
buffer = pdf.to_csv(index=False)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_csv(
"s3://{}/{}".format(bname, fname), storage_options=s3so
"s3://{}/{}".format(bname, fname),
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
)

assert_eq(pdf, got)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv_byte_range(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
fname = "test_csv_reader_byte_range.csv"
bname = "csv"
buffer = pdf.to_csv(index=False)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_csv(
"s3://{}/{}".format(bname, fname),
storage_options=s3so,
byte_range=(74, 73),
bytes_per_thread=bytes_per_thread,
header=False,
names=["Integer", "Float", "Integer2", "String", "Boolean"],
)

assert_eq(pdf.iloc[-2:].reset_index(drop=True), got)


@pytest.mark.parametrize("chunksize", [None, 3])
def test_write_csv(s3_base, s3so, pdf, chunksize):
# Write to buffer
Expand All @@ -156,18 +178,42 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):
assert_eq(pdf, got)


def test_read_parquet(s3_base, s3so, pdf):
@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
def test_read_parquet(s3_base, s3so, pdf, bytes_per_thread, columns):
fname = "test_parquet_reader.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_parquet(
"s3://{}/{}".format(bname, fname), storage_options=s3so
"s3://{}/{}".format(bname, fname),
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
columns=columns,
)

assert_eq(pdf, got)
expect = pdf[columns] if columns else pdf
assert_eq(expect, got)


def test_read_parquet_filters(s3_base, s3so, pdf):
fname = "test_parquet_reader_filters.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
filters = [("String", "==", "Omega")]
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
storage_options=s3so,
filters=filters,
)

# All row-groups should be filtered out
assert_eq(pdf.iloc[:0], got.reset_index(drop=True))


def test_write_parquet(s3_base, s3so, pdf):
Expand Down
Loading