Skip to content

Commit

Permalink
Add optimized read_parquet path for remote storage (#1119)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora authored Sep 22, 2021
1 parent 9b5393d commit ad939c1
Show file tree
Hide file tree
Showing 6 changed files with 432 additions and 64 deletions.
58 changes: 26 additions & 32 deletions nvtabular/io/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import dask
import numpy as np
import py
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
Expand Down Expand Up @@ -230,6 +229,9 @@ def __init__(
self.client = client
self.schema = schema

# Cache for "real" (sampled) metadata
self._real_meta = {}

# Check if we are keeping data in cpu memory
self.cpu = cpu
if not self.cpu:
Expand Down Expand Up @@ -281,11 +283,7 @@ def __init__(
part_size = int(device_mem_size(kind="total", cpu=self.cpu) * part_mem_fraction)

# Engine-agnostic path handling
paths = path_or_source
if hasattr(paths, "name"):
paths = stringify_path(paths)
if isinstance(paths, (py._path.local.LocalPath, Path)):
paths = str(paths)
paths = stringify_path(path_or_source)
if isinstance(paths, str):
paths = [paths]
paths = sorted(paths, key=natural_sort_key)
Expand Down Expand Up @@ -1107,21 +1105,7 @@ def infer_schema(self, n=1):

dtypes = {}
try:
_ddf = self.to_ddf()
dtypes = {
col_name: {"dtype": dtype, "is_list": False}
for col_name, dtype in _ddf.dtypes.items()
}
for partition_index in range(_ddf.npartitions):
_head = _ddf.partitions[partition_index].head(n)

if len(_head):
for col in _head.columns:
dtypes[col] = {
"dtype": dispatch._list_val_dtype(_head[col]) or _head[col].dtype,
"is_list": dispatch._is_list_dtype(_head[col]),
}

dtypes = self.sample_dtypes(n=n, annotate_lists=True)
except RuntimeError:
warnings.warn(
"Unable to sample column dtypes to infer nvt.Dataset schema, schema is empty."
Expand All @@ -1136,20 +1120,30 @@ def infer_schema(self, n=1):
self.schema = Schema(column_schemas)
return self.schema

def sample_dtypes(self, n=1):
def sample_dtypes(self, n=1, annotate_lists=False):
"""Return the real dtypes of the Dataset
Sample the partitions of the underlying Dask collection
until a non-empty partition is found. Then, use the first
``n`` rows of that partition to infer dtype info. If no
non-empty partitions are found, use the Dask dtypes.
Use cached metadata if this operation was
already performed. Otherwise, call down to the
underlying engine for sampling logic.
"""
_ddf = self.to_ddf()
for partition_index in range(_ddf.npartitions):
_head = _ddf.partitions[partition_index].head(n)
if len(_head):
return _head.dtypes
return _ddf.dtypes
if self._real_meta.get(n, None) is None:
_real_meta = self.engine.sample_data(n=n)
if self.dtypes:
_real_meta = _set_dtypes(_real_meta, self.dtypes)
self._real_meta[n] = _real_meta

if annotate_lists:
_real_meta = self._real_meta[n]
return {
col: {
"dtype": dispatch._list_val_dtype(_real_meta[col]) or _real_meta[col].dtype,
"is_list": dispatch._is_list_dtype(_real_meta[col]),
}
for col in _real_meta.columns
}

return self._real_meta[n].dtypes

@classmethod
def _bind_dd_method(cls, name):
Expand Down
15 changes: 15 additions & 0 deletions nvtabular/io/dataset_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,18 @@ def validate_dataset(self, **kwargs):
@classmethod
def regenerate_dataset(cls, dataset, output_path, columns=None, **kwargs):
raise NotImplementedError(""" Regenerate a dataset with optimal properties """)

def sample_data(self, n=1):
"""Return a sample of real data from the dataset
Sample the partitions of the underlying Dask collection
until a non-empty partition is found. Then, use the first
``n`` rows of that partition to infer dtype info. If no
non-empty partitions are found, use the Dask metadata.
"""
_ddf = self.to_ddf()
for partition_index in range(_ddf.npartitions):
_head = _ddf.partitions[partition_index].head(n)
if len(_head):
return _head
return _ddf._meta
Loading

0 comments on commit ad939c1

Please sign in to comment.