From aca8db5cb3bc3caf2bfd8a5f157886c8e730ec44 Mon Sep 17 00:00:00 2001 From: Rehan Durrani Date: Mon, 23 May 2022 12:30:59 -0700 Subject: [PATCH] FIX-#4479: Prevent users from using a local filepath when performing a distributed write Signed-off-by: Rehan Durrani --- docs/release_notes/release_notes-0.15.0.rst | 1 + .../implementations/pandas_on_ray/io/io.py | 12 +++++++++ .../execution/ray/implementations/utils.py | 25 +++++++++++++++++++ 3 files changed, 38 insertions(+) create mode 100644 modin/core/execution/ray/implementations/utils.py diff --git a/docs/release_notes/release_notes-0.15.0.rst b/docs/release_notes/release_notes-0.15.0.rst index 67d4718a2d4..606759add12 100644 --- a/docs/release_notes/release_notes-0.15.0.rst +++ b/docs/release_notes/release_notes-0.15.0.rst @@ -35,6 +35,7 @@ Key Features and Updates * * Developer API enhancements * FEAT-#4359: Add __dataframe__ method to the protocol dataframe (#4360) + * FIX-#4479: Prevent users from using a local filepath when performing a distributed write (#4484) * Update testing suite * TEST-#4363: Use Ray from pypi in CI (#4364) * FIX-#4422: get rid of case sensitivity for `warns_that_defaulting_to_pandas` (#4423) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index bcba828f42b..6286c28d49e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -42,6 +42,7 @@ from modin.core.execution.ray.common import RayTask, SignalActor from ..dataframe import PandasOnRayDataframe from ..partitioning import PandasOnRayDataframePartition +from modin.core.execution.ray.implementations.utils import is_local_path class PandasOnRayIO(RayIO): @@ -165,6 +166,12 @@ def to_csv(cls, qc, **kwargs): if not cls._to_csv_check_support(kwargs): return RayIO.to_csv(qc, **kwargs) + if len(ray.nodes()) > 1: + path = kwargs["path_or_buf"] + if is_local_path(path): + raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.") + + signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) def func(df, **kw): @@ -276,6 +283,11 @@ def to_parquet(cls, qc, **kwargs): """ if not cls._to_parquet_check_support(kwargs): return RayIO.to_parquet(qc, **kwargs) + + if len(ray.nodes()) > 1: + path = kwargs["path_or_buf"] + if is_local_path(path): + raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.") def func(df, **kw): """ diff --git a/modin/core/execution/ray/implementations/utils.py b/modin/core/execution/ray/implementations/utils.py new file mode 100644 index 00000000000..798db3d92f4 --- /dev/null +++ b/modin/core/execution/ray/implementations/utils.py @@ -0,0 +1,25 @@ +import os +import pathlib +import re + +S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") + +def is_local_path(path_or_buf) -> bool: + """ + Return True if the specified path_or_buf is a local path, False otherwise. + + Parameters + ---------- + path_or_buf : str, path object or file-like object + The path or buffer to check. + + Returns + ------- + Whether the `path_or_buf` points to a local file. + """ + if isinstance(path_or_buf, str): + if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: + return False # S3 or network path. + if isinstance(path_or_buf, str) or isinstance(path_or_buf, pathlib.PurePath): + return os.path.exists(path_or_buf) + return False