Skip to content

Commit

Permalink
Address review comments and fix util for non-local paths and non-exis…
Browse files Browse the repository at this point in the history
…tent local paths

Signed-off-by: Rehan Durrani <[email protected]>
  • Loading branch information
RehanSD committed May 26, 2022
1 parent 8c7120b commit dcadcf4
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def to_csv(cls, qc, **kwargs):
if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]):
raise ValueError(
"`path_or_buf` must point to a networked file or buffer when in cluster mode."
)
)

signals = SignalActor.remote(len(qc._modin_frame._partitions) + 1)

Expand Down Expand Up @@ -286,7 +286,7 @@ def to_parquet(cls, qc, **kwargs):
if len(ray.nodes()) > 1 and is_local_path(kwargs["path_or_buf"]):
raise ValueError(
"`path_or_buf` must point to a networked file or buffer when in cluster mode."
)
)

def func(df, **kw):
"""
Expand Down
36 changes: 35 additions & 1 deletion modin/core/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os
import pathlib
import re
from typing import Union

S3_ADDRESS_REGEX = re.compile("[sS]3://(.*?)/(.*)")

Expand All @@ -37,5 +38,38 @@ def is_local_path(path_or_buf) -> bool:
if S3_ADDRESS_REGEX.match(path_or_buf) is not None or "://" in path_or_buf:
return False # S3 or network path.
if isinstance(path_or_buf, (str, pathlib.PurePath)):
return os.path.exists(path_or_buf)
if os.path.exists(path_or_buf):
return True
local_device_id = os.stat(os.getcwd()).st_dev
path_device_id = get_device_id(path_or_buf)
if path_device_id == local_device_id:
return True
return False


def get_device_id(path: Union[str, pathlib.PurePath]) -> Union[int, None]:
"""
Return the result of `os.stat(path).st_dev` for the portion of `path` that exists locally.
Parameters
----------
path : str, path object
The path to check.
Returns
-------
The `st_dev` field of `os.stat` of the portion of the `path` that exists locally, None if no
part of the path exists locally.
"""
index = 1
path_list = list(pathlib.Path(path).parts)
if path_list[0] == "/":
index += 1
try:
os.stat(os.path.join(*path_list[:index]))
except:
return None
while os.path.exists(os.path.join(*path_list[:index])):
index += 1
index -= 1
return os.stat(os.path.join(*path_list[:index])).st_dev

0 comments on commit dcadcf4

Please sign in to comment.