Skip to content

Commit

Permalink
Merge pull request #5075 from rjzamora/opt-parquet
Browse files Browse the repository at this point in the history
Add simple row-group-aggregating parquet reader
  • Loading branch information
Keith Kraus authored May 7, 2020
2 parents 6383db8 + 8a46d97 commit 17ad431
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
- PR #5034 Use loc to apply boolmask to frame efficiently when constructing query result
- PR #5039 Make `annotate` picklable
- PR #5045 Remove call to `unique()` in concat when `axis=1`
- PR #5075 Add simple row-group aggregation mechanism in dask_cudf read_parquet
- PR #5084 Improve downcasting in `Series.label_encoding()` to reduce memory usage
- PR #5085 Print more precise numerical strings in unit tests
- PR #5028 Add Docker 19 support to local gpuci build
Expand Down
212 changes: 212 additions & 0 deletions python/dask_cudf/dask_cudf/io/opt_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

import warnings

import pyarrow.parquet as pq
from fsspec.core import get_fs_token_paths
from fsspec.utils import stringify_path

from dask.base import tokenize
from dask.dataframe.io.parquet.core import set_index_columns
from dask.dataframe.io.parquet.utils import (
_normalize_index_columns,
_parse_pandas_metadata,
)
from dask.dataframe.io.utils import _get_pyarrow_dtypes, _meta_from_dtypes

import cudf
from cudf.core.column import as_column

from dask_cudf import DataFrame

try:
import ujson as json
except ImportError:
import json


def _get_dataset_and_parts(data_path, fs, row_groups_per_part):
parts = []
dataset = pq.ParquetDataset(data_path, filesystem=fs)
if dataset.metadata:
fpath_last = None
rgi = 0
rg_list = []
for rg in range(dataset.metadata.num_row_groups):

fpath = dataset.metadata.row_group(rg).column(0).file_path

if fpath_last and fpath_last != fpath:
rgi = 0
full_path = fs.sep.join([data_path, fpath_last])
parts.append(tuple([full_path, rg_list]))
rg_list = []
elif len(rg_list) >= row_groups_per_part:
full_path = fs.sep.join([data_path, fpath_last])
parts.append(tuple([full_path, rg_list]))
rg_list = []

if fpath is None:
raise ValueError("_metadata file is missing file_path string.")

fpath_last = fpath
rg_list.append(rgi)
rgi += 1
if rg_list:
full_path = fs.sep.join([data_path, fpath_last])
parts.append(tuple([full_path, rg_list]))
else:
warnings.warn(
"Must have metadata file to split by row group."
"Using full file for each partition."
)
for piece in dataset.pieces:
parts.append(tuple([piece.path, None]))

return dataset, parts


def _read_metadata(fs, path, row_groups_per_part, index=None):
dataset, parts = _get_dataset_and_parts(path, fs, row_groups_per_part)
if not dataset.metadata:
raise ValueError("_metadata file is missing.")

schema = dataset.metadata.schema.to_arrow_schema()
columns = None
has_pandas_metadata = (
schema.metadata is not None and b"pandas" in schema.metadata
)
categories = None
if has_pandas_metadata:
pandas_metadata = json.loads(schema.metadata[b"pandas"].decode("utf8"))
(
index_names,
column_names,
storage_name_mapping,
column_index_names,
) = _parse_pandas_metadata(pandas_metadata)
categories = []
for col in pandas_metadata["columns"]:
if (col["pandas_type"] == "categorical") and (
col["name"] not in categories
):
categories.append(col["name"])
else:
index_names = []
column_names = schema.names
storage_name_mapping = {k: k for k in column_names}
column_index_names = [None]

if index is None and index_names:
index = index_names

column_names, index_names = _normalize_index_columns(
columns, column_names, index, index_names
)
all_columns = index_names + column_names

dtypes = _get_pyarrow_dtypes(schema, categories)
dtypes = {storage_name_mapping.get(k, k): v for k, v in dtypes.items()}

index_cols = index or ()
meta = _meta_from_dtypes(
all_columns, dtypes, index_cols, column_index_names
)

return meta, parts


def _read_partition(part, index, columns, strings_to_cats):
# Read dataset part
path, row_groups = part
if columns is not None:
columns = [c for c in columns]
if isinstance(index, list):
columns += index

if row_groups:
df = cudf.io.read_parquet(
path,
row_group=row_groups[0],
row_group_count=len(row_groups),
columns=columns,
strings_to_cats=strings_to_cats,
)
else:
df = cudf.io.read_parquet(
path, columns=columns, strings_to_cats=strings_to_cats
)

if index and (index[0] in df.columns):
df = df.set_index(index[0])
return df


def parquet_reader(
path,
columns=None,
row_groups_per_part=None,
index=None,
storage_options=None,
**kwargs,
):

name = "opt-read-parquet-" + tokenize(
path, columns, index, storage_options, row_groups_per_part
)

if hasattr(path, "name"):
path = stringify_path(path)
fs, _, paths = get_fs_token_paths(
path, mode="rb", storage_options=storage_options
)
if len(paths) > 1 or not fs.isdir(paths[0]):
raise ValueError(
"Must pass in a directory path to use `row_groups_per_part`."
)

auto_index_allowed = False
if index is None:
# User is allowing auto-detected index
auto_index_allowed = True
if index and isinstance(index, str):
index = [index]

dd_meta, parts = _read_metadata(fs, path, row_groups_per_part, index=index)
strings_to_cats = kwargs.get("strings_to_categorical", False)
meta = cudf.DataFrame(index=dd_meta.index)
for col in dd_meta.columns:
if dd_meta[col].dtype == "O":
meta[col] = as_column(
dd_meta[col], dtype="int32" if strings_to_cats else "object"
)
else:
meta[col] = as_column(dd_meta[col])

if meta.index.name is not None:
index = meta.index.name

# Account for index and columns arguments.
# Modify `meta` dataframe accordingly
index_in_columns = False
meta, index, columns = set_index_columns(
meta, index, columns, index_in_columns, auto_index_allowed
)

dsk = {}
for p, part in enumerate(parts):
read_key = (name, p)
dsk[read_key] = (
_read_partition,
part,
index,
columns,
strings_to_cats,
)

# Set the index that was previously treated as a column
if index_in_columns:
meta = meta.set_index(index)

divisions = [None] * (len(parts) + 1)
return DataFrame(dsk, name, meta, divisions)
24 changes: 24 additions & 0 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.

import warnings
from functools import partial

Expand Down Expand Up @@ -148,6 +150,7 @@ def read_parquet(
chunksize=None,
split_row_groups=True,
gather_statistics=None,
row_groups_per_part=None,
**kwargs,
):
""" Read parquet files into a Dask DataFrame
Expand All @@ -169,6 +172,27 @@ class to support full functionality.
"""
if isinstance(columns, str):
columns = [columns]

if row_groups_per_part:
from .opt_parquet import parquet_reader

warnings.warn(
"Using optimized read_parquet engine. This option does not "
"support partitioned datsets or filtering, and will not "
"result in known divisions. Do not use `row_groups_per_part` "
"if full support is needed."
)
if kwargs.get("filters", None):
raise ValueError(
"Cannot use `filters` with `row_groups_per_part=True`."
)
return parquet_reader(
path,
columns=columns,
row_groups_per_part=row_groups_per_part,
**kwargs,
)

if chunksize and gather_statistics is False:
warnings.warn(
"Setting chunksize parameter with gather_statistics=False. "
Expand Down
43 changes: 43 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.

import math
import os

import numpy as np
Expand Down Expand Up @@ -281,3 +284,43 @@ def test_chunksize(tmpdir, chunksize, metadata):
remainder = (df_byte_size % parse_bytes(chunksize)) > 0
expected += int(remainder) * nparts
assert ddf2.npartitions == max(nparts, expected)


@pytest.mark.parametrize("row_groups", [1, 3, 10, 12])
@pytest.mark.parametrize("index", [False, True])
def test_row_groups_per_part(tmpdir, row_groups, index):
nparts = 2
df_size = 100
row_group_size = 5
file_row_groups = 10 # Known apriori
npartitions_expected = math.ceil(file_row_groups / row_groups) * 2

df = pd.DataFrame(
{
"a": np.random.choice(["apple", "banana", "carrot"], size=df_size),
"b": np.random.random(size=df_size),
"c": np.random.randint(1, 5, size=df_size),
"index": np.arange(0, df_size),
}
)
if index:
df = df.set_index("index")

ddf1 = dd.from_pandas(df, npartitions=nparts)
ddf1.to_parquet(
str(tmpdir),
engine="pyarrow",
row_group_size=row_group_size,
write_metadata_file=True,
write_index=index,
)

ddf2 = dask_cudf.read_parquet(
str(tmpdir),
row_groups_per_part=row_groups,
index="index" if index else False,
)

assert_eq(ddf1, ddf2, check_divisions=False, check_index=index)

assert ddf2.npartitions == npartitions_expected

0 comments on commit 17ad431

Please sign in to comment.