From 7a51eb9fbcc3c75309f292f572dd22b25a55b278 Mon Sep 17 00:00:00 2001 From: Alexey Prutskov Date: Thu, 22 Oct 2020 15:40:38 +0300 Subject: [PATCH] FIX-#1765: Fix support of s3 in `read_parquet` Signed-off-by: Alexey Prutskov --- modin/backends/pandas/parsers.py | 11 ++++++++++ modin/engines/base/frame/data.py | 8 ++++---- .../base/io/column_stores/parquet_reader.py | 19 +++++++++++++++++- modin/pandas/test/test_io.py | 20 +++++++++++++++++++ requirements/env_omnisci.yml | 1 + 5 files changed, 54 insertions(+), 5 deletions(-) diff --git a/modin/backends/pandas/parsers.py b/modin/backends/pandas/parsers.py index 159ccd982e8..67d29fe5900 100644 --- a/modin/backends/pandas/parsers.py +++ b/modin/backends/pandas/parsers.py @@ -355,6 +355,17 @@ class PandasParquetParser(PandasParser): def parse(fname, **kwargs): num_splits = kwargs.pop("num_splits", None) columns = kwargs.get("columns", None) + if fname.startswith("s3://"): + from botocore.exceptions import NoCredentialsError + import s3fs + + try: + fs = s3fs.S3FileSystem() + fname = fs.open(fname) + except NoCredentialsError: + fs = s3fs.S3FileSystem(anon=True) + fname = fs.open(fname) + if num_splits is None: return pandas.read_parquet(fname, **kwargs) kwargs["use_pandas_metadata"] = True diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index 37af1a73912..ae2f0f935ba 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -285,14 +285,14 @@ def _filter_empties(self): [ self._partitions[i][j] for j in range(len(self._partitions[i])) - if j < len(self._column_widths) and self._column_widths[j] > 0 + if j < len(self._column_widths) and self._column_widths[j] != 0 ] for i in range(len(self._partitions)) - if i < len(self._row_lengths) and self._row_lengths[i] > 0 + if i < len(self._row_lengths) and self._row_lengths[i] != 0 ] ) - self._column_widths_cache = [w for w in self._column_widths if w > 0] - self._row_lengths_cache = [r for r in self._row_lengths if r > 0] + self._column_widths_cache = [w for w in self._column_widths if w != 0] + self._row_lengths_cache = [r for r in self._row_lengths if r != 0] def _validate_axis_equality(self, axis: int, force: bool = False): """ diff --git a/modin/engines/base/io/column_stores/parquet_reader.py b/modin/engines/base/io/column_stores/parquet_reader.py index 7c8ae96d565..07011a1920c 100644 --- a/modin/engines/base/io/column_stores/parquet_reader.py +++ b/modin/engines/base/io/column_stores/parquet_reader.py @@ -12,6 +12,7 @@ # governing permissions and limitations under the License. import os +import s3fs from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader from modin.error_message import ErrorMessage @@ -48,7 +49,7 @@ def _read(cls, path, engine, columns, **kwargs): from pyarrow.parquet import ParquetFile, ParquetDataset from modin.pandas.io import PQ_INDEX_REGEX - if os.path.isdir(path): + if isinstance(path, str) and os.path.isdir(path): partitioned_columns = set() directory = True # We do a tree walk of the path directory because partitioned @@ -84,6 +85,22 @@ def _read(cls, path, engine, columns, **kwargs): pd = ParquetDataset(path, filesystem=fs) meta = pd.metadata column_names = pd.schema.names + elif isinstance(path, s3fs.S3File) or ( + isinstance(path, str) and path.startswith("s3://") + ): + from botocore.exceptions import NoCredentialsError + + if isinstance(path, s3fs.S3File): + bucket_path = path.url().split(".s3.amazonaws.com") + path = "s3://" + bucket_path[0].split("://")[1] + bucket_path[1] + try: + fs = s3fs.S3FileSystem() + pd = ParquetDataset(path, filesystem=fs) + except NoCredentialsError: + fs = s3fs.S3FileSystem(anon=True) + pd = ParquetDataset(path, filesystem=fs) + meta = pd.metadata + column_names = pd.schema.names else: meta = ParquetFile(path).metadata column_names = meta.schema.names diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 197a03fb6dd..047cf5995c0 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -1052,6 +1052,26 @@ def test_from_csv_s3(make_csv_file): df_equals(modin_df, pandas_df) +@pytest.mark.skipif( + Engine.get() == "Python", + reason="S3-like path doesn't support in pandas with anonymous credentials. See issue #2301.", +) +def test_read_parquet_s3(): + import s3fs + + # Pandas currently supports only default credentials for boto therefore + # we use S3FileSystem with `anon=True` for to make testing possible. + dataset_url = "s3://aws-roda-hcls-datalake/chembl_27/chembl_27_public_tissue_dictionary/part-00000-66508102-96fa-4fd9-a0fd-5bc072a74293-c000.snappy.parquet" + fs = s3fs.S3FileSystem(anon=True) + pandas_df = pandas.read_parquet(fs.open(dataset_url, "rb")) + modin_df_s3fs = pd.read_parquet(fs.open(dataset_url, "rb")) + df_equals(pandas_df, modin_df_s3fs) + + # Modin supports default and anonymous credentials and resolves this internally. + modin_df_s3 = pd.read_parquet(dataset_url) + df_equals(pandas_df, modin_df_s3) + + def test_from_csv_default(make_csv_file): # We haven't implemented read_csv from https, but if it's implemented, then this needs to change dataset_url = "https://raw.githubusercontent.com/modin-project/modin/master/modin/pandas/test/data/blah.csv" diff --git a/requirements/env_omnisci.yml b/requirements/env_omnisci.yml index cf615ea66de..26e17c64008 100644 --- a/requirements/env_omnisci.yml +++ b/requirements/env_omnisci.yml @@ -12,5 +12,6 @@ dependencies: - coverage<5.0 - pygithub==1.53 - omniscidbe4py + - s3fs>=0.4.2 - pip: - ray>=1.0.0