Skip to content

Commit

Permalink
FIX-#1144: Fix read_parquet for working with HDFS
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Prutskov <[email protected]>
  • Loading branch information
prutskov committed Sep 23, 2020
1 parent 7c350dd commit 77150c3
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions modin/engines/base/io/column_stores/parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,29 @@ class ParquetReader(ColumnStoreReader):
@classmethod
def _read(cls, path, engine, columns, **kwargs):
"""Load a parquet object from the file path, returning a Modin DataFrame.
Modin only supports pyarrow engine for now.
Args:
path: The filepath of the parquet file.
We only support local files for now.
engine: Modin only supports pyarrow reader.
This argument doesn't do anything for now.
kwargs: Pass into parquet's read_pandas function.
Modin only supports pyarrow engine for now.
Notes:
ParquetFile API is used. Please refer to the documentation here
https://arrow.apache.org/docs/python/parquet.html
Parameters
----------
path: str
The filepath of the parquet file in local filesystem or hdfs.
engine: 'pyarrow'
Parquet library to use
columns: list or None
If not None, only these columns will be read from the file.
kwargs: dict
Keyword arguments.
Returns
-------
PandasQueryCompiler
A new Query Compiler.
Notes
-----
ParquetFile API is used. Please refer to the documentation here
https://arrow.apache.org/docs/python/parquet.html
"""
from pyarrow.parquet import ParquetFile, ParquetDataset
from modin.pandas.io import PQ_INDEX_REGEX
Expand Down Expand Up @@ -66,6 +77,13 @@ def _read(cls, path, engine, columns, **kwargs):
pd = ParquetDataset(path)
meta = pd.metadata
column_names = pd.schema.names
elif isinstance(path, str) and path.startswith("hdfs://"):
import fsspec.core

fs, path = fsspec.core.url_to_fs(path)
pd = ParquetDataset(path, filesystem=fs)
meta = pd.metadata
column_names = pd.schema.names
else:
meta = ParquetFile(path).metadata
column_names = meta.schema.names
Expand Down

0 comments on commit 77150c3

Please sign in to comment.