Skip to content

Commit

Permalink
Use fsspec.parquet for improved read_parquet performance from remote …
Browse files Browse the repository at this point in the history
…storage (#9589)

**Important Note**: ~Marking this as WIP until the `fsspec.parquet` module is available in a filesystem_spec release~ (fsspec.parquet module is available)

This PR modifies `cudf.read_parquet` and `dask_cudf.read_parquet` to leverage the new `fsspec.parquet.open_parquet_file` function for optimized data transfer/caching from remote storage. The ~long-term~ goal is to remove the temporary data-transfer optimizations that we currently use in cudf.read_parquet.

**Performance Motivation**:

```python
In [1]: import cudf, dask_cudf
   ...: path = [
   ...:     "gs://my-bucket/criteo-parquet/day_0.parquet",
   ...:     "gs://my-bucket/criteo-parquet/day_1.parquet",
   ...: ]

# cudf BEFORE
In [2]: %time df = cudf.read_parquet(path, columns=["I10"], storage_options=…)
CPU times: user 11.1 s, sys: 11.5 s, total: 22.6 s
Wall time: 24.4 s

# cudf AFTER
In [2]: %time df = cudf.read_parquet(path, columns=["I10"], storage_options=…)
CPU times: user 3.48 s, sys: 722 ms, total: 4.2 s
Wall time: 6.32 s

# (Threaded) Dask-cudf BEFORE
In [2]: %time df = dask_cudf.read_parquet(path, columns=["I10"], storage_options=…).compute()
CPU times: user 27.1 s, sys: 15.5 s, total: 42.6 s
Wall time: 57.6 s

# (Threaded) Dask-cudf AFTER
In [2]: %time df = dask_cudf.read_parquet(path, columns=["I10"], storage_options=…).compute()
CPU times: user 3.43 s, sys: 851 ms, total: 4.28 s
Wall time: 13.1 s
```

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - https://github.com/brandon-b-miller
  - Benjamin Zaitlen (https://github.com/quasiben)

URL: #9589
  • Loading branch information
rjzamora authored Jan 20, 2022
1 parent 12a0f59 commit 09035d6
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 221 deletions.
7 changes: 0 additions & 7 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,10 @@ def read_csv(
"`read_csv` does not yet support reading multiple files"
)

# Only need to pass byte_ranges to get_filepath_or_buffer
# if `use_python_file_object=False`
byte_ranges = None
if not use_python_file_object and byte_range:
byte_ranges = [byte_range]

filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO, NativeFile),
byte_ranges=byte_ranges,
use_python_file_object=use_python_file_object,
**kwargs,
)
Expand Down
181 changes: 55 additions & 126 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import io
import json
import warnings
from collections import defaultdict
from contextlib import ExitStack
from typing import Dict, List, Tuple
from uuid import uuid4

import fsspec
import numpy as np
from pyarrow import dataset as ds, parquet as pq

Expand Down Expand Up @@ -310,103 +307,6 @@ def _process_dataset(
)


def _get_byte_ranges(file_list, row_groups, columns, fs, **kwargs):

# This utility is used to collect the footer metadata
# from a parquet file. This metadata is used to define
# the exact byte-ranges that will be needed to read the
# target column-chunks from the file.
#
# This utility is only used for remote storage.
#
# The calculated byte-range information is used within
# cudf.io.ioutils.get_filepath_or_buffer (which uses
# _fsspec_data_transfer to convert non-local fsspec file
# objects into local byte buffers).

if row_groups is None:
if columns is None:
return None, None, None # No reason to construct this
row_groups = [None for path in file_list]

# Construct a list of required byte-ranges for every file
all_byte_ranges, all_footers, all_sizes = [], [], []
for path, rgs in zip(file_list, row_groups):

# Step 0 - Get size of file
if fs is None:
file_size = path.size
else:
file_size = fs.size(path)

# Step 1 - Get 32 KB from tail of file.
#
# This "sample size" can be tunable, but should
# always be >= 8 bytes (so we can read the footer size)
tail_size = min(kwargs.get("footer_sample_size", 32_000), file_size,)
if fs is None:
path.seek(file_size - tail_size)
footer_sample = path.read(tail_size)
else:
footer_sample = fs.tail(path, tail_size)

# Step 2 - Read the footer size and re-read a larger
# tail if necessary
footer_size = int.from_bytes(footer_sample[-8:-4], "little")
if tail_size < (footer_size + 8):
if fs is None:
path.seek(file_size - (footer_size + 8))
footer_sample = path.read(footer_size + 8)
else:
footer_sample = fs.tail(path, footer_size + 8)

# Step 3 - Collect required byte ranges
byte_ranges = []
md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata
column_set = None if columns is None else set(columns)
if column_set is not None:
schema = md.schema.to_arrow_schema()
has_pandas_metadata = (
schema.metadata is not None and b"pandas" in schema.metadata
)
if has_pandas_metadata:
md_index = [
ind
for ind in json.loads(
schema.metadata[b"pandas"].decode("utf8")
).get("index_columns", [])
# Ignore RangeIndex information
if not isinstance(ind, dict)
]
column_set |= set(md_index)
for r in range(md.num_row_groups):
# Skip this row-group if we are targetting
# specific row-groups
if rgs is None or r in rgs:
row_group = md.row_group(r)
for c in range(row_group.num_columns):
column = row_group.column(c)
name = column.path_in_schema
# Skip this column if we are targetting a
# specific columns
split_name = name.split(".")[0]
if (
column_set is None
or name in column_set
or split_name in column_set
):
file_offset0 = column.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.data_page_offset
num_bytes = column.total_compressed_size
byte_ranges.append((file_offset0, num_bytes))

all_byte_ranges.append(byte_ranges)
all_footers.append(footer_sample)
all_sizes.append(file_size)
return all_byte_ranges, all_footers, all_sizes


@ioutils.doc_read_parquet()
def read_parquet(
filepath_or_buffer,
Expand All @@ -418,13 +318,24 @@ def read_parquet(
num_rows=None,
strings_to_categorical=False,
use_pandas_metadata=True,
use_python_file_object=False,
use_python_file_object=True,
categorical_partitions=True,
open_file_options=None,
*args,
**kwargs,
):
"""{docstring}"""

# Do not allow the user to set file-opening options
# when `use_python_file_object=False` is specified
if use_python_file_object is False:
if open_file_options:
raise ValueError(
"open_file_options is not currently supported when "
"use_python_file_object is set to False."
)
open_file_options = {}

# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(filepath_or_buffer):
Expand Down Expand Up @@ -470,38 +381,18 @@ def read_parquet(
raise ValueError("cudf cannot apply filters to open file objects.")
filepath_or_buffer = paths if paths else filepath_or_buffer

# Check if we should calculate the specific byte-ranges
# needed for each parquet file. We always do this when we
# have a file-system object to work with and it is not a
# local filesystem object. We can also do it without a
# file-system object for `AbstractBufferedFile` buffers
byte_ranges, footers, file_sizes = None, None, None
if not use_python_file_object:
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(
fs
)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs, **kwargs
)

filepaths_or_buffers = []
if use_python_file_object:
open_file_options = _default_open_file_options(
open_file_options, columns, row_groups, fs=fs,
)
for i, source in enumerate(filepath_or_buffer):

tmp_source, compression = ioutils.get_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
byte_ranges=byte_ranges[i] if byte_ranges else None,
footer=footers[i] if footers else None,
file_size=file_sizes[i] if file_sizes else None,
add_par1_magic=True,
use_python_file_object=use_python_file_object,
open_file_options=open_file_options,
**kwargs,
)

Expand Down Expand Up @@ -953,3 +844,41 @@ def __enter__(self):

def __exit__(self, *args):
self.close()


def _default_open_file_options(
open_file_options, columns, row_groups, fs=None
):
"""
Set default fields in open_file_options.
Copies and updates `open_file_options` to
include column and row-group information
under the "precache_options" key. By default,
we set "method" to "parquet", but precaching
will be disabled if the user chooses `method=None`
Parameters
----------
open_file_options : dict or None
columns : list
row_groups : list
fs : fsspec.AbstractFileSystem, Optional
"""
if fs and ioutils._is_local_filesystem(fs):
# Quick return for local fs
return open_file_options or {}
# Assume remote storage if `fs` was not specified
open_file_options = (open_file_options or {}).copy()
precache_options = open_file_options.pop("precache_options", {}).copy()
if precache_options.get("method", "parquet") == "parquet":
precache_options.update(
{
"method": "parquet",
"engine": precache_options.get("engine", "pyarrow"),
"columns": columns,
"row_groups": row_groups,
}
)
open_file_options["precache_options"] = precache_options
return open_file_options
13 changes: 10 additions & 3 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,19 +748,26 @@ def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
assert_eq(expect, got)


def test_parquet_reader_use_python_file_object(parquet_path_or_buf):
@pytest.mark.parametrize("use_python_file_object", [True, False])
def test_parquet_reader_use_python_file_object(
parquet_path_or_buf, use_python_file_object
):
# Check that the non-default `use_python_file_object=True`
# option works as expected
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(fil, use_python_file_object=True)
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(paths[0], use_python_file_object=True)
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
assert_eq(expect, got2)


Expand Down
43 changes: 25 additions & 18 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def pdf_ext(scope="module"):
df["Integer"] = np.array([i for i in range(size)])
df["List"] = [[i] for i in range(size)]
df["Struct"] = [{"a": i} for i in range(size)]
df["String"] = (["Alpha", "Beta", "Gamma", "Delta"] * (-(size // -4)))[
:size
]
return df


Expand Down Expand Up @@ -225,9 +228,16 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):

@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
@pytest.mark.parametrize("use_python_file_object", [False, True])
@pytest.mark.parametrize("precache", [None, "parquet"])
@pytest.mark.parametrize("use_python_file_object", [True, False])
def test_read_parquet(
s3_base, s3so, pdf, bytes_per_thread, columns, use_python_file_object
s3_base,
s3so,
pdf,
bytes_per_thread,
columns,
precache,
use_python_file_object,
):
fname = "test_parquet_reader.parquet"
bname = "parquet"
Expand All @@ -239,10 +249,15 @@ def test_read_parquet(
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
open_file_options=(
{"precache_options": {"method": precache}}
if use_python_file_object
else None
),
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
columns=columns,
use_python_file_object=use_python_file_object,
)
expect = pdf[columns] if columns else pdf
assert_eq(expect, got1)
Expand All @@ -256,25 +271,18 @@ def test_read_parquet(
with fs.open("s3://{}/{}".format(bname, fname), mode="rb") as f:
got2 = cudf.read_parquet(
f,
use_python_file_object=use_python_file_object,
bytes_per_thread=bytes_per_thread,
columns=columns,
use_python_file_object=use_python_file_object,
)
assert_eq(expect, got2)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["List", "Struct"]])
@pytest.mark.parametrize("use_python_file_object", [False, True])
@pytest.mark.parametrize("index", [None, "Integer"])
def test_read_parquet_ext(
s3_base,
s3so,
pdf_ext,
bytes_per_thread,
columns,
use_python_file_object,
index,
s3_base, s3so, pdf_ext, bytes_per_thread, columns, index,
):
fname = "test_parquet_reader_ext.parquet"
bname = "parquet"
Expand All @@ -290,7 +298,6 @@ def test_read_parquet_ext(
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
footer_sample_size=3200,
Expand Down Expand Up @@ -326,24 +333,24 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns):
assert_eq(expect, got)


@pytest.mark.parametrize("python_file", [True, False])
def test_read_parquet_filters(s3_base, s3so, pdf, python_file):
@pytest.mark.parametrize("precache", [None, "parquet"])
def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache):
fname = "test_parquet_reader_filters.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
pdf_ext.to_parquet(path=buffer)
buffer.seek(0)
filters = [("String", "==", "Omega")]
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
storage_options=s3so,
filters=filters,
use_python_file_object=python_file,
open_file_options={"precache_options": {"method": precache}},
)

# All row-groups should be filtered out
assert_eq(pdf.iloc[:0], got.reset_index(drop=True))
assert_eq(pdf_ext.iloc[:0], got.reset_index(drop=True))


@pytest.mark.parametrize("partition_cols", [None, ["String"]])
Expand Down
Loading

0 comments on commit 09035d6

Please sign in to comment.