Skip to content

Commit

Permalink
FIX-modin-project#4479: Prevent users from using a local filepath whe…
Browse files Browse the repository at this point in the history
…n performing a distributed write

Signed-off-by: Rehan Durrani <[email protected]>
  • Loading branch information
RehanSD committed May 23, 2022
1 parent 0f70e82 commit aca8db5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.15.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Key Features and Updates
*
* Developer API enhancements
* FEAT-#4359: Add __dataframe__ method to the protocol dataframe (#4360)
* FIX-#4479: Prevent users from using a local filepath when performing a distributed write (#4484)
* Update testing suite
* TEST-#4363: Use Ray from pypi in CI (#4364)
* FIX-#4422: get rid of case sensitivity for `warns_that_defaulting_to_pandas` (#4423)
Expand Down
12 changes: 12 additions & 0 deletions modin/core/execution/ray/implementations/pandas_on_ray/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from modin.core.execution.ray.common import RayTask, SignalActor
from ..dataframe import PandasOnRayDataframe
from ..partitioning import PandasOnRayDataframePartition
from modin.core.execution.ray.implementations.utils import is_local_path


class PandasOnRayIO(RayIO):
Expand Down Expand Up @@ -165,6 +166,12 @@ def to_csv(cls, qc, **kwargs):
if not cls._to_csv_check_support(kwargs):
return RayIO.to_csv(qc, **kwargs)

if len(ray.nodes()) > 1:
path = kwargs["path_or_buf"]
if is_local_path(path):
raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.")


signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)

def func(df, **kw):
Expand Down Expand Up @@ -276,6 +283,11 @@ def to_parquet(cls, qc, **kwargs):
"""
if not cls._to_parquet_check_support(kwargs):
return RayIO.to_parquet(qc, **kwargs)

if len(ray.nodes()) > 1:
path = kwargs["path_or_buf"]
if is_local_path(path):
raise ValueError("`path_or_buf` must point to a networked file or buffer when in cluster mode.")

def func(df, **kw):
"""
Expand Down
25 changes: 25 additions & 0 deletions modin/core/execution/ray/implementations/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
import pathlib
import re

S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)")

def is_local_path(path_or_buf) -> bool:
"""
Return True if the specified path_or_buf is a local path, False otherwise.
Parameters
----------
path_or_buf : str, path object or file-like object
The path or buffer to check.
Returns
-------
Whether the `path_or_buf` points to a local file.
"""
if isinstance(path_or_buf, str):
if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf:
return False # S3 or network path.
if isinstance(path_or_buf, str) or isinstance(path_or_buf, pathlib.PurePath):
return os.path.exists(path_or_buf)
return False

0 comments on commit aca8db5

Please sign in to comment.