Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Enable writing to s3 storage in chunked parquet writer #10769

Merged
merged 18 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conda/environments/cudf_dev_cuda11.5.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ dependencies:
- pydata-sphinx-theme
- librdkafka=1.7.0
- python-confluent-kafka=1.7.0
- moto>=3.1.6
- boto3>=1.21.21
- botocore>=1.24.21
- aiobotocore>=2.2.0
- s3fs>=2022.3.0
- flask
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
- flask_cors
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
- pip:
- git+https://github.com/python-streamz/streamz.git@master
- pyorc
Expand Down
1 change: 1 addition & 0 deletions conda/recipes/cudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ requirements:
- cupy >=9.5.0,<11.0.0a0
- numba >=0.54
- numpy
- s3fs >=2022.3.0
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
- {{ pin_compatible('pyarrow', max_pin='x.x.x') }} *cuda
- libcudf {{ version }}
- fastavro >=0.22.0
Expand Down
69 changes: 51 additions & 18 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import shutil
import tempfile
import warnings
from collections import defaultdict
from contextlib import ExitStack
from typing import Dict, List, Tuple
from uuid import uuid4

import numpy as np
import s3fs
from pyarrow import dataset as ds, parquet as pq

import cudf
Expand Down Expand Up @@ -206,12 +209,25 @@ def _process_dataset(
filters = pq._filters_to_expression(filters)

# Initialize ds.FilesystemDataset
dataset = ds.dataset(
paths,
filesystem=fs,
format="parquet",
partitioning="hive",
)
if (
isinstance(fs, s3fs.S3FileSystem)
and len(paths) == 1
and fs.isdir(paths[0])
):
# TODO: Remove this workaround after following bug is fixed:
# https://issues.apache.org/jira/browse/ARROW-16438
dataset = ds.dataset(
"s3://" + paths[0],
format="parquet",
partitioning="hive",
)
else:
dataset = ds.dataset(
paths,
filesystem=fs,
format="parquet",
partitioning="hive",
)
file_list = dataset.files
if len(file_list) == 0:
raise FileNotFoundError(f"{paths} could not be resolved to any files")
Expand Down Expand Up @@ -724,6 +740,7 @@ def __init__(
index=None,
compression=None,
statistics="ROWGROUP",
**kwargs,
) -> None:
"""
Write a parquet file or dataset incrementally
Expand Down Expand Up @@ -776,7 +793,12 @@ def __init__(
<filename>.parquet

"""
self.path = path
if isinstance(path, str) and path.startswith("s3://"):
self.fs_meta = {"is_s3": True, "actual_path": path}
self.path = tempfile.TemporaryDirectory().name
else:
self.fs_meta = {}
self.path = path
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
self.common_args = {
"index": index,
"compression": compression,
Expand All @@ -792,6 +814,7 @@ def __init__(
# in self._chunked_writers for reverse lookup
self.path_cw_map: Dict[str, int] = {}
self.filename = None
self.kwargs = kwargs

@_cudf_nvtx_annotate
def write_table(self, df):
Expand Down Expand Up @@ -837,18 +860,19 @@ def write_table(self, df):
]
cw.write_table(grouped_df, this_cw_part_info)

# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
if new_cw_paths:
# Create new cw for unhandled paths encountered in this write_table
new_paths, part_info, meta_paths = zip(*new_cw_paths)
self._chunked_writers.append(
(
ParquetWriter(new_paths, **self.common_args),
new_paths,
meta_paths,
)
Comment on lines -1054 to +1076
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible fto wrap theseParquetWriter objects in a file-opeing shim? This way, for non-local storaage, we could open the necessarry paths with fsspec before passing the open "file-like" objects down to ParquetWriter. The shim would need to keep track of the open files and close them after the underlying ParquetWriter object is closed.

If I understand correctly (which I may not), we should be able to write directly to s3 without the file-copy workaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was my initial goto-approach(same as df.to_parquet) and after implementing it I ran into issues. Upon investigation, the way ParquetWriter is written right now I realized that writing to non-local storage won't fully create and flush the data..even when we manually flush the data after writing a table, only a file is created with 0 bytes and no data is written...for the formation of a correct parquet file with the same file connection open if I try to write the parquet metadata in the footer during the close operation the libraries end up overwriting the existing file in s3 storage and thus losing all the past data and only writing the footer metadata. After repeated tries of different combinations of (write table, flush, write metadata, flush, close) and no one happening successfully I came to a conclusion that the way s3(or other cloud file systems) are designed seems to be such that they act as object stores rather than true file-systems where they allow append operations to the contents of existing files. At least for s3 this is the limitation while ParquetWriter is basically built around the concept of appending.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay - I still have a suspicion that we can make this work, but I do think the approach you are using here is reasonable - So, I can experiment and follow up separately. My question here is definitely not a blocker :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will be happy to chat about the findings.

)
)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)
new_cw_idx = len(self._chunked_writers) - 1
self.path_cw_map.update({k: new_cw_idx for k in new_paths})
self._chunked_writers[-1][0].write_table(grouped_df, part_info)

@_cudf_nvtx_annotate
def close(self, return_metadata=False):
Expand All @@ -862,6 +886,15 @@ def close(self, return_metadata=False):
for cw, _, meta_path in self._chunked_writers
]

if self.fs_meta.get("is_s3", False):
local_path = self.path
s3_path = self.fs_meta["actual_path"]
s3_file, _ = ioutils._get_filesystem_and_paths(
s3_path, **self.kwargs
)
s3_file.put(local_path, s3_path, recursive=True)
shutil.rmtree(self.path)

if return_metadata:
return (
merge_parquet_filemetadata(metadata)
Expand Down
44 changes: 43 additions & 1 deletion python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import cudf
from cudf.testing._utils import assert_eq

moto = pytest.importorskip("moto", minversion="1.3.14")
moto = pytest.importorskip("moto", minversion="3.1.6")
boto3 = pytest.importorskip("boto3")
requests = pytest.importorskip("requests")
s3fs = pytest.importorskip("s3fs")
flask = pytest.importorskip("flask")
flask_cors = pytest.importorskip("flask_cors")
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved


@contextmanager
Expand Down Expand Up @@ -49,6 +51,7 @@ def s3_base(worker_id):
# system aws credentials, https://github.com/spulec/moto/issues/1793
os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret")
os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG")

# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost
Expand Down Expand Up @@ -457,3 +460,42 @@ def test_write_orc(s3_base, s3so, pdf):
got = pa.orc.ORCFile(f).read().to_pandas()

assert_eq(pdf, got)


def test_write_chunked_parquet(s3_base, s3so):
df1 = cudf.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]})
df2 = cudf.DataFrame({"b": [20, 30, 50], "a": [3, 2, 1]})
dirname = "chunked_writer_directory"
bname = "parquet"
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
from cudf.io.parquet import ParquetDatasetWriter

with s3_context(
s3_base=s3_base, bucket=bname, files={dirname: BytesIO()}
) as s3fs:
cw = ParquetDatasetWriter(
f"s3://{bname}/{dirname}",
partition_cols=["a"],
storage_options=s3so,
)
cw.write_table(df1)
cw.write_table(df2)
cw.close()
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved

# TODO: Replace following workaround with:
# expect = cudf.read_parquet(f"s3://{bname}/{dirname}/",
# storage_options=s3so)
# after the following bug is fixed:
# https://issues.apache.org/jira/browse/ARROW-16438

dfs = []
for folder in {"a=1", "a=2", "a=3"}:
assert s3fs.exists(f"s3://{bname}/{dirname}/{folder}")
for file in s3fs.ls(f"s3://{bname}/{dirname}/{folder}"):
df = cudf.read_parquet("s3://" + file, storage_options=s3so)
dfs.append(df)

actual = cudf.concat(dfs).astype("int64")
assert_eq(
actual.sort_values(["b"]).reset_index(drop=True),
cudf.concat([df1, df2]).sort_values(["b"]).reset_index(drop=True),
)
3 changes: 3 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

import os
import shlex
import subprocess
Expand Down Expand Up @@ -42,6 +44,7 @@ def s3_base(worker_id):
# system aws credentials, https://github.com/spulec/moto/issues/1793
os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key")
os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret")
os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG")

# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost
Expand Down