Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use fsspec.parquet for improved read_parquet performance from remote storage #9589

Merged
merged 31 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0d542d9
use fsspec.parquet.open_parquet_file
rjzamora Nov 3, 2021
acf3d08
fix use_python_file_object default
rjzamora Nov 4, 2021
6d29bf6
Hopefully address 9599
rjzamora Nov 4, 2021
62fd911
Merge remote-tracking branch 'upstream/branch-21.12' into use-fsspec-…
rjzamora Nov 8, 2021
fd5f594
update s3 test
rjzamora Nov 8, 2021
6ed95d8
remove use_python_file_object default for now
rjzamora Nov 8, 2021
69b87fd
Merge remote-tracking branch 'upstream/branch-21.12' into use-fsspec-…
rjzamora Nov 9, 2021
be013aa
use new use_fsspec_parquet argument to turn on experimental fsspec.pa…
rjzamora Nov 9, 2021
8eb23be
consolidate new logic in ioutils
rjzamora Nov 9, 2021
9ccaf96
fix bugs in default remote-storage optimization
rjzamora Nov 9, 2021
f502671
Merge branch 'branch-22.02' into use-fsspec-parquet
rjzamora Nov 29, 2021
7260617
update defaults to reflect latest fsspec features
rjzamora Nov 30, 2021
c992d0d
fix bug in _handle_fsspec_parquet
rjzamora Nov 30, 2021
c1f541f
Merge remote-tracking branch 'upstream/branch-22.02' into use-fsspec-…
rjzamora Dec 9, 2021
7b5ae53
align changes with dask#8339 and remove unnecessary use_python_file_o…
rjzamora Dec 9, 2021
b9bd156
remove excessive use_python_file_object removal
rjzamora Dec 10, 2021
5747df0
revise api to make other format-specific optimizations easier (I hope)
rjzamora Dec 10, 2021
39cc97f
fix typos
rjzamora Dec 10, 2021
2cc389c
fix row-group bug
rjzamora Dec 10, 2021
05caadf
add open_file_cb test coverage
rjzamora Dec 14, 2021
83b3862
Merge remote-tracking branch 'upstream/branch-22.02' into use-fsspec-…
rjzamora Jan 10, 2022
6e22f74
align new API with latest Dask design
rjzamora Jan 12, 2022
fe319a9
tweak comment to trigger formatting
rjzamora Jan 12, 2022
435e432
reformat ioutils.py
rjzamora Jan 12, 2022
84ba4b3
Merge remote-tracking branch 'upstream/branch-22.02' into use-fsspec-…
rjzamora Jan 14, 2022
c9cfe94
allow use_python_file_object=False, but fall back on old/slow behavior
rjzamora Jan 18, 2022
f300d01
Merge remote-tracking branch 'upstream/branch-22.02' into use-fsspec-…
rjzamora Jan 18, 2022
739c2d4
Merge remote-tracking branch 'upstream/branch-22.02' into use-fsspec-…
rjzamora Jan 19, 2022
895e71e
address some code review
rjzamora Jan 19, 2022
d527873
more code-review changes
rjzamora Jan 19, 2022
51ebe83
add missing word to warning message
rjzamora Jan 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,10 @@ def read_csv(
"`read_csv` does not yet support reading multiple files"
)

# Only need to pass byte_ranges to get_filepath_or_buffer
# if `use_python_file_object=False`
byte_ranges = None
if not use_python_file_object and byte_range:
byte_ranges = [byte_range]
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO, NativeFile),
byte_ranges=byte_ranges,
use_python_file_object=use_python_file_object,
**kwargs,
)
Expand Down
181 changes: 55 additions & 126 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import io
import json
import warnings
from collections import defaultdict
from contextlib import ExitStack
from typing import Dict, List, Tuple
from uuid import uuid4

import fsspec
import numpy as np
from pyarrow import dataset as ds, parquet as pq

Expand Down Expand Up @@ -310,103 +307,6 @@ def _process_dataset(
)


def _get_byte_ranges(file_list, row_groups, columns, fs, **kwargs):
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

# This utility is used to collect the footer metadata
# from a parquet file. This metadata is used to define
# the exact byte-ranges that will be needed to read the
# target column-chunks from the file.
#
# This utility is only used for remote storage.
#
# The calculated byte-range information is used within
# cudf.io.ioutils.get_filepath_or_buffer (which uses
# _fsspec_data_transfer to convert non-local fsspec file
# objects into local byte buffers).

if row_groups is None:
if columns is None:
return None, None, None # No reason to construct this
row_groups = [None for path in file_list]

# Construct a list of required byte-ranges for every file
all_byte_ranges, all_footers, all_sizes = [], [], []
for path, rgs in zip(file_list, row_groups):

# Step 0 - Get size of file
if fs is None:
file_size = path.size
else:
file_size = fs.size(path)

# Step 1 - Get 32 KB from tail of file.
#
# This "sample size" can be tunable, but should
# always be >= 8 bytes (so we can read the footer size)
tail_size = min(kwargs.get("footer_sample_size", 32_000), file_size,)
if fs is None:
path.seek(file_size - tail_size)
footer_sample = path.read(tail_size)
else:
footer_sample = fs.tail(path, tail_size)

# Step 2 - Read the footer size and re-read a larger
# tail if necessary
footer_size = int.from_bytes(footer_sample[-8:-4], "little")
if tail_size < (footer_size + 8):
if fs is None:
path.seek(file_size - (footer_size + 8))
footer_sample = path.read(footer_size + 8)
else:
footer_sample = fs.tail(path, footer_size + 8)

# Step 3 - Collect required byte ranges
byte_ranges = []
md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata
column_set = None if columns is None else set(columns)
if column_set is not None:
schema = md.schema.to_arrow_schema()
has_pandas_metadata = (
schema.metadata is not None and b"pandas" in schema.metadata
)
if has_pandas_metadata:
md_index = [
ind
for ind in json.loads(
schema.metadata[b"pandas"].decode("utf8")
).get("index_columns", [])
# Ignore RangeIndex information
if not isinstance(ind, dict)
]
column_set |= set(md_index)
for r in range(md.num_row_groups):
# Skip this row-group if we are targetting
# specific row-groups
if rgs is None or r in rgs:
row_group = md.row_group(r)
for c in range(row_group.num_columns):
column = row_group.column(c)
name = column.path_in_schema
# Skip this column if we are targetting a
# specific columns
split_name = name.split(".")[0]
if (
column_set is None
or name in column_set
or split_name in column_set
):
file_offset0 = column.dictionary_page_offset
if file_offset0 is None:
file_offset0 = column.data_page_offset
num_bytes = column.total_compressed_size
byte_ranges.append((file_offset0, num_bytes))

all_byte_ranges.append(byte_ranges)
all_footers.append(footer_sample)
all_sizes.append(file_size)
return all_byte_ranges, all_footers, all_sizes


@ioutils.doc_read_parquet()
def read_parquet(
filepath_or_buffer,
Expand All @@ -418,13 +318,24 @@ def read_parquet(
num_rows=None,
strings_to_categorical=False,
use_pandas_metadata=True,
use_python_file_object=False,
use_python_file_object=True,
categorical_partitions=True,
open_file_options=None,
*args,
**kwargs,
):
"""{docstring}"""

# Do not allow the user to set file-opening options
# when `use_python_file_object=False` is specified
if use_python_file_object is False:
if open_file_options:
raise ValueError(
"open_file_options is not currently supported when "
"use_python_file_object is set to False."
)
open_file_options = {}

# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(filepath_or_buffer):
Expand Down Expand Up @@ -470,38 +381,18 @@ def read_parquet(
raise ValueError("cudf cannot apply filters to open file objects.")
filepath_or_buffer = paths if paths else filepath_or_buffer

# Check if we should calculate the specific byte-ranges
# needed for each parquet file. We always do this when we
# have a file-system object to work with and it is not a
# local filesystem object. We can also do it without a
# file-system object for `AbstractBufferedFile` buffers
byte_ranges, footers, file_sizes = None, None, None
if not use_python_file_object:
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(
fs
)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs, **kwargs
)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

filepaths_or_buffers = []
if use_python_file_object:
open_file_options = _default_open_file_options(
open_file_options, columns, row_groups, fs=fs,
)
for i, source in enumerate(filepath_or_buffer):

tmp_source, compression = ioutils.get_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
byte_ranges=byte_ranges[i] if byte_ranges else None,
footer=footers[i] if footers else None,
file_size=file_sizes[i] if file_sizes else None,
add_par1_magic=True,
use_python_file_object=use_python_file_object,
open_file_options=open_file_options,
**kwargs,
)

Expand Down Expand Up @@ -953,3 +844,41 @@ def __enter__(self):

def __exit__(self, *args):
self.close()


def _default_open_file_options(
open_file_options, columns, row_groups, fs=None
):
"""
Set default fields in open_file_options.

Copies and updates `open_file_options` to
include column and row-group information
under the "precache_options" key. By default,
we set "method" to "parquet", but precaching
will be disabled if the user chooses `method=None`

Parameters
----------
open_file_options : dict or None
columns : list
row_groups : list
fs : fsspec.AbstractFileSystem, Optional
"""
if fs and ioutils._is_local_filesystem(fs):
# Quick return for local fs
return open_file_options or {}
# Assume remote storage if `fs` was not specified
open_file_options = (open_file_options or {}).copy()
precache_options = open_file_options.pop("precache_options", {}).copy()
if precache_options.get("method", "parquet") == "parquet":
precache_options.update(
{
"method": "parquet",
"engine": precache_options.get("engine", "pyarrow"),
"columns": columns,
"row_groups": row_groups,
}
)
open_file_options["precache_options"] = precache_options
return open_file_options
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 10 additions & 3 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,19 +748,26 @@ def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
assert_eq(expect, got)


def test_parquet_reader_use_python_file_object(parquet_path_or_buf):
@pytest.mark.parametrize("use_python_file_object", [True, False])
def test_parquet_reader_use_python_file_object(
parquet_path_or_buf, use_python_file_object
):
# Check that the non-default `use_python_file_object=True`
# option works as expected
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(fil, use_python_file_object=True)
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(paths[0], use_python_file_object=True)
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
assert_eq(expect, got2)


Expand Down
43 changes: 25 additions & 18 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ def pdf_ext(scope="module"):
df["Integer"] = np.array([i for i in range(size)])
df["List"] = [[i] for i in range(size)]
df["Struct"] = [{"a": i} for i in range(size)]
df["String"] = (["Alpha", "Beta", "Gamma", "Delta"] * (-(size // -4)))[
:size
]
return df


Expand Down Expand Up @@ -225,9 +228,16 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):

@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
@pytest.mark.parametrize("use_python_file_object", [False, True])
@pytest.mark.parametrize("precache", [None, "parquet"])
@pytest.mark.parametrize("use_python_file_object", [True, False])
def test_read_parquet(
s3_base, s3so, pdf, bytes_per_thread, columns, use_python_file_object
s3_base,
s3so,
pdf,
bytes_per_thread,
columns,
precache,
use_python_file_object,
):
fname = "test_parquet_reader.parquet"
bname = "parquet"
Expand All @@ -239,10 +249,15 @@ def test_read_parquet(
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
open_file_options=(
{"precache_options": {"method": precache}}
if use_python_file_object
else None
),
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
columns=columns,
use_python_file_object=use_python_file_object,
)
expect = pdf[columns] if columns else pdf
assert_eq(expect, got1)
Expand All @@ -256,25 +271,18 @@ def test_read_parquet(
with fs.open("s3://{}/{}".format(bname, fname), mode="rb") as f:
got2 = cudf.read_parquet(
f,
use_python_file_object=use_python_file_object,
bytes_per_thread=bytes_per_thread,
columns=columns,
use_python_file_object=use_python_file_object,
)
assert_eq(expect, got2)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["List", "Struct"]])
@pytest.mark.parametrize("use_python_file_object", [False, True])
@pytest.mark.parametrize("index", [None, "Integer"])
def test_read_parquet_ext(
s3_base,
s3so,
pdf_ext,
bytes_per_thread,
columns,
use_python_file_object,
index,
s3_base, s3so, pdf_ext, bytes_per_thread, columns, index,
):
fname = "test_parquet_reader_ext.parquet"
bname = "parquet"
Expand All @@ -290,7 +298,6 @@ def test_read_parquet_ext(
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
footer_sample_size=3200,
Expand Down Expand Up @@ -326,24 +333,24 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns):
assert_eq(expect, got)


@pytest.mark.parametrize("python_file", [True, False])
def test_read_parquet_filters(s3_base, s3so, pdf, python_file):
@pytest.mark.parametrize("precache", [None, "parquet"])
def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache):
fname = "test_parquet_reader_filters.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
pdf_ext.to_parquet(path=buffer)
buffer.seek(0)
filters = [("String", "==", "Omega")]
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
storage_options=s3so,
filters=filters,
use_python_file_object=python_file,
open_file_options={"precache_options": {"method": precache}},
)

# All row-groups should be filtered out
assert_eq(pdf.iloc[:0], got.reset_index(drop=True))
assert_eq(pdf_ext.iloc[:0], got.reset_index(drop=True))


@pytest.mark.parametrize("partition_cols", [None, ["String"]])
Expand Down
Loading