diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py index 7a745e06da4..4b306694a09 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/io/io.py @@ -169,7 +169,7 @@ def to_csv(cls, qc, **kwargs): if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): raise ValueError( "`path_or_buf` must point to a networked file or buffer when in cluster mode." - ) + ) signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1) @@ -286,7 +286,7 @@ def to_parquet(cls, qc, **kwargs): if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]): raise ValueError( "`path_or_buf` must point to a networked file or buffer when in cluster mode." - ) + ) def func(df, **kw): """ diff --git a/modin/core/io/utils.py b/modin/core/io/utils.py index 0ece205fa98..90a7d98a108 100644 --- a/modin/core/io/utils.py +++ b/modin/core/io/utils.py @@ -16,6 +16,7 @@ import os import pathlib import re +from typing import Union S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)") @@ -37,5 +38,38 @@ def is_local_path(path_or_buf) -> bool: if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf: return False # S3 or network path. if isinstance(path_or_buf, (str, pathlib.PurePath)): - return os.path.exists(path_or_buf) + if os.path.exists(path_or_buf): + return True + local_device_id = os.stat(os.getcwd()).st_dev + path_device_id = get_device_id(path_or_buf) + if path_device_id == local_device_id: + return True return False + + +def get_device_id(path: Union[str, pathlib.PurePath]) -> Union[int, None]: + """ + Return the result of `os.stat(path).st_dev` for the portion of `path` that exists locally. + + Parameters + ---------- + path : str, path object + The path to check. + + Returns + ------- + The `st_dev` field of `os.stat` of the portion of the `path` that exists locally, None if no + part of the path exists locally. + """ + index = 1 + path_list = list(pathlib.Path(path).parts) + if path_list[0] == "/": + index += 1 + try: + os.stat(os.path.join(*path_list[:index])) + except: + return None + while os.path.exists(os.path.join(*path_list[:index])): + index += 1 + index -= 1 + return os.stat(os.path.join(*path_list[:index])).st_dev