Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#1144: Fix read_parquet for working with HDFS #2120

Merged
merged 1 commit into from
Sep 23, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions modin/engines/base/io/column_stores/parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,29 @@ class ParquetReader(ColumnStoreReader):
@classmethod
def _read(cls, path, engine, columns, **kwargs):
"""Load a parquet object from the file path, returning a Modin DataFrame.
Modin only supports pyarrow engine for now.

Args:
path: The filepath of the parquet file.
We only support local files for now.
engine: Modin only supports pyarrow reader.
This argument doesn't do anything for now.
kwargs: Pass into parquet's read_pandas function.
Modin only supports pyarrow engine for now.

Notes:
ParquetFile API is used. Please refer to the documentation here
https://arrow.apache.org/docs/python/parquet.html
Parameters
----------
path: str
The filepath of the parquet file in local filesystem or hdfs.
engine: 'pyarrow'
Parquet library to use
columns: list or None
If not None, only these columns will be read from the file.
kwargs: dict
Keyword arguments.

Returns
-------
PandasQueryCompiler
A new Query Compiler.

Notes
-----
ParquetFile API is used. Please refer to the documentation here
https://arrow.apache.org/docs/python/parquet.html
"""
from pyarrow.parquet import ParquetFile, ParquetDataset
from modin.pandas.io import PQ_INDEX_REGEX
Expand Down Expand Up @@ -66,6 +77,13 @@ def _read(cls, path, engine, columns, **kwargs):
pd = ParquetDataset(path)
meta = pd.metadata
column_names = pd.schema.names
elif isinstance(path, str) and path.startswith("hdfs://"):
import fsspec.core

fs, path = fsspec.core.url_to_fs(path)
pd = ParquetDataset(path, filesystem=fs)
meta = pd.metadata
column_names = pd.schema.names
else:
meta = ParquetFile(path).metadata
column_names = meta.schema.names
Expand Down