Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimized read_parquet path for remote storage #1119

Merged
merged 38 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
72d43ca
updating parquet engine to be more maintainable
rjzamora Sep 1, 2021
20fe797
add test coverage for filters
rjzamora Sep 1, 2021
4a41d32
file-aggregation test coverage
rjzamora Sep 1, 2021
1988ac4
fix cpu= bug in tests
rjzamora Sep 1, 2021
6d5df67
comment tweak
rjzamora Sep 1, 2021
8b1f615
more comment tweaks
rjzamora Sep 1, 2021
7d2676b
Merge branch 'main' into update-parquet-engine
rjzamora Sep 1, 2021
49c9f25
protect GPUParquetEngine definition
rjzamora Sep 1, 2021
4193c06
Merge branch 'update-parquet-engine' of https://github.com/rjzamora/N…
rjzamora Sep 1, 2021
639ff09
fix s3fs issue
rjzamora Sep 1, 2021
7dd00ef
Merge branch 'main' into update-parquet-engine
rjzamora Sep 1, 2021
94cd9d1
fix num_rows bug
rjzamora Sep 1, 2021
1e8fc4e
add optimized read_parquet code path for remote storage
rjzamora Sep 16, 2021
a82538d
Merge remote-tracking branch 'upstream/main' into gcs-optimize
rjzamora Sep 16, 2021
08affa3
avoid client read in cudf
rjzamora Sep 17, 2021
0d81fd9
format check
rjzamora Sep 17, 2021
e2a4373
use delayed to avoid reading parquet data on the client process
rjzamora Sep 17, 2021
69cebd7
avoid calling ocmpute unless a global dask client is detected
rjzamora Sep 17, 2021
f886efe
Merge branch 'main' into avoid-client-read
rjzamora Sep 17, 2021
de518b5
fix delayed _reset_df_index usage
rjzamora Sep 17, 2021
cc7180e
improve _reset_df_index readability/usage
rjzamora Sep 17, 2021
5333ebf
Merge remote-tracking branch 'upstream/main' into gcs-optimize
rjzamora Sep 17, 2021
fc66fc7
Merge branch 'avoid-client-read' into gcs-optimize
rjzamora Sep 17, 2021
0afe794
Merge branch 'main' into gcs-optimize
rjzamora Sep 17, 2021
dd5d931
Merge branch 'main' into gcs-optimize
rjzamora Sep 17, 2021
9738aa4
explict buffer deletion
rjzamora Sep 17, 2021
5738578
Merge branch 'gcs-optimize' of https://github.com/rjzamora/NVTabular …
rjzamora Sep 17, 2021
d1dd81f
use same read for row-group sample and dtypes
rjzamora Sep 18, 2021
099de79
Merge remote-tracking branch 'upstream/main' into gcs-optimize
rjzamora Sep 20, 2021
596300c
clean up sample_dtypes optimization
rjzamora Sep 20, 2021
215cd95
Merge branch 'main' into gcs-optimize
rjzamora Sep 20, 2021
015c9d1
Update nvtabular/io/fsspec_utils.py
rjzamora Sep 20, 2021
75fee35
Merge remote-tracking branch 'upstream/main' into gcs-optimize
rjzamora Sep 21, 2021
90a614a
Merge branch 'main' into gcs-optimize
rjzamora Sep 21, 2021
fe1071e
Merge branch 'main' into gcs-optimize
benfred Sep 21, 2021
0d4e880
fix infer_schema regression
rjzamora Sep 22, 2021
1eeba17
Merge branch 'gcs-optimize' of https://github.com/rjzamora/NVTabular …
rjzamora Sep 22, 2021
413fcc4
remove py import and usage
rjzamora Sep 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 26 additions & 32 deletions nvtabular/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import dask
import numpy as np
import py
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
Expand Down Expand Up @@ -230,6 +229,9 @@ def __init__(
self.client = client
self.schema = schema

# Cache for "real" (sampled) metadata
self._real_meta = {}

# Check if we are keeping data in cpu memory
self.cpu = cpu
if not self.cpu:
Expand Down Expand Up @@ -281,11 +283,7 @@ def __init__(
part_size = int(device_mem_size(kind="total", cpu=self.cpu) * part_mem_fraction)

# Engine-agnostic path handling
paths = path_or_source
if hasattr(paths, "name"):
paths = stringify_path(paths)
if isinstance(paths, (py._path.local.LocalPath, Path)):
paths = str(paths)
paths = stringify_path(path_or_source)
if isinstance(paths, str):
paths = [paths]
paths = sorted(paths, key=natural_sort_key)
Expand Down Expand Up @@ -1107,21 +1105,7 @@ def infer_schema(self, n=1):

dtypes = {}
try:
_ddf = self.to_ddf()
dtypes = {
col_name: {"dtype": dtype, "is_list": False}
for col_name, dtype in _ddf.dtypes.items()
}
for partition_index in range(_ddf.npartitions):
_head = _ddf.partitions[partition_index].head(n)

if len(_head):
for col in _head.columns:
dtypes[col] = {
"dtype": dispatch._list_val_dtype(_head[col]) or _head[col].dtype,
"is_list": dispatch._is_list_dtype(_head[col]),
}

dtypes = self.sample_dtypes(n=n, annotate_lists=True)
except RuntimeError:
warnings.warn(
"Unable to sample column dtypes to infer nvt.Dataset schema, schema is empty."
Expand All @@ -1136,20 +1120,30 @@ def infer_schema(self, n=1):
self.schema = Schema(column_schemas)
return self.schema

def sample_dtypes(self, n=1):
def sample_dtypes(self, n=1, annotate_lists=False):
"""Return the real dtypes of the Dataset

Sample the partitions of the underlying Dask collection
until a non-empty partition is found. Then, use the first
``n`` rows of that partition to infer dtype info. If no
non-empty partitions are found, use the Dask dtypes.
Use cached metadata if this operation was
already performed. Otherwise, call down to the
underlying engine for sampling logic.
"""
_ddf = self.to_ddf()
for partition_index in range(_ddf.npartitions):
_head = _ddf.partitions[partition_index].head(n)
if len(_head):
return _head.dtypes
return _ddf.dtypes
if self._real_meta.get(n, None) is None:
_real_meta = self.engine.sample_data(n=n)
if self.dtypes:
_real_meta = _set_dtypes(_real_meta, self.dtypes)
self._real_meta[n] = _real_meta

if annotate_lists:
_real_meta = self._real_meta[n]
return {
col: {
"dtype": dispatch._list_val_dtype(_real_meta[col]) or _real_meta[col].dtype,
"is_list": dispatch._is_list_dtype(_real_meta[col]),
}
for col in _real_meta.columns
}

return self._real_meta[n].dtypes

@classmethod
def _bind_dd_method(cls, name):
Expand Down
15 changes: 15 additions & 0 deletions nvtabular/io/dataset_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,18 @@ def validate_dataset(self, **kwargs):
@classmethod
def regenerate_dataset(cls, dataset, output_path, columns=None, **kwargs):
raise NotImplementedError(""" Regenerate a dataset with optimal properties """)

def sample_data(self, n=1):
"""Return a sample of real data from the dataset

Sample the partitions of the underlying Dask collection
until a non-empty partition is found. Then, use the first
``n`` rows of that partition to infer dtype info. If no
non-empty partitions are found, use the Dask metadata.
"""
_ddf = self.to_ddf()
for partition_index in range(_ddf.npartitions):
_head = _ddf.partitions[partition_index].head(n)
if len(_head):
return _head
return _ddf._meta
Loading