Skip to content

Commit

Permalink
done?
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Wu committed Sep 18, 2021
1 parent ea521e3 commit 573f4e0
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 32 deletions.
10 changes: 1 addition & 9 deletions python/ray/workflow/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import Any, Dict, Generator, List, Optional, Tuple, TYPE_CHECKING

from collections import ChainMap
import io

if TYPE_CHECKING:
from ray.actor import ActorHandle
Expand Down Expand Up @@ -182,16 +181,9 @@ class ObjectRefPickler(cloudpickle.CloudPickler):
key = storage.make_key(*paths)

# TODO Use open()
# with wf_storage.open(key, "w") as f:
# pickler = ObjectRefPickler(f)
# pickler.dump(obj)

with io.BytesIO() as f:
with storage.open(key) as f:
pickler = ObjectRefPickler(f)
pickler.dump(obj)
f.seek(0)
task = storage.put(key, f.read())
tasks.append(task)

await asyncio.gather(*tasks)

Expand Down
9 changes: 9 additions & 0 deletions python/ray/workflow/storage/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
from abc import abstractmethod
import io
from typing import Any, List


Expand Down Expand Up @@ -47,6 +48,14 @@ async def get(self, key: str, is_json: bool = False) -> Any:
The object from storage.
"""

@abstractmethod
def open(self, key: str) -> io.BufferedIOBase:
"""Returns a file like object that conforms to io.BufferedIOBase.
Args:
key: The key of the object.
"""

@abstractmethod
async def delete_prefix(self, key_prefix: str) -> None:
"""Delete an object with prefix.
Expand Down
6 changes: 5 additions & 1 deletion python/ray/workflow/storage/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,18 @@ def make_key(self, *names: str) -> str:
return self._wrapped_storage.make_key(*names)

async def get(self, key: str, is_json: bool = False) -> Any:
await self._logged_storage.get(key, is_json)
if self._log_on:
await self._logged_storage.get(key, is_json)
return await self._wrapped_storage.get(key, is_json)

async def put(self, key: str, data: Any, is_json: bool = False) -> None:
if self._log_on:
await self._logged_storage.put(key, data, is_json)
await self._wrapped_storage.put(key, data, is_json)

def open(self, key: str):
return self._wrapped_storage.open(key)

async def delete_prefix(self, prefix: str) -> None:
if self._log_on:
await self._logged_storage.delete_prefix(prefix)
Expand Down
47 changes: 26 additions & 21 deletions python/ray/workflow/storage/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import itertools
import io
import json
import shutil
import pathlib
Expand Down Expand Up @@ -33,18 +34,14 @@ def _open_atomic(path: pathlib.Path, mode="r"):
raise ValueError("Atomic open does not support appending.")
# backup file is hidden by default
backup_path = path.with_name(f".{path.name}.backup")
if "r" in mode: # read mode
if _file_exists(path):
f = open(path, mode)
else:
raise KeyNotFoundError(path)
try:
yield f
finally:
f.close()
elif "x" in mode: # create mode

if "w" in mode: # overwrite mode
# backup existing file
if path.exists():
raise FileExistsError(path)
# remove an even older backup file
if backup_path.exists():
backup_path.unlink()
path.rename(backup_path)
tmp_new_fn = path.with_suffix(f".{path.name}.{uuid.uuid4().hex}")
if not tmp_new_fn.parent.exists():
tmp_new_fn.parent.mkdir(parents=True)
Expand All @@ -58,18 +55,25 @@ def _open_atomic(path: pathlib.Path, mode="r"):
finally:
f.close()
if write_ok:
# "commit" file if writing succeeded
tmp_new_fn.rename(path)
# cleanup the backup file
if backup_path.exists():
backup_path.unlink()
else:
# remove file if writing failed
tmp_new_fn.unlink()
elif "w" in mode: # overwrite mode
# backup existing file
elif "r" in mode: # read mode
if _file_exists(path):
f = open(path, mode)
else:
raise KeyNotFoundError(path)
try:
yield f
finally:
f.close()
elif "x" in mode: # create mode
if path.exists():
# remove an even older backup file
if backup_path.exists():
backup_path.unlink()
path.rename(backup_path)
raise FileExistsError(path)
tmp_new_fn = path.with_suffix(f".{path.name}.{uuid.uuid4().hex}")
if not tmp_new_fn.parent.exists():
tmp_new_fn.parent.mkdir(parents=True)
Expand All @@ -83,10 +87,8 @@ def _open_atomic(path: pathlib.Path, mode="r"):
finally:
f.close()
if write_ok:
# "commit" file if writing succeeded
tmp_new_fn.rename(path)
# cleanup the backup file
if backup_path.exists():
backup_path.unlink()
else:
# remove file if writing failed
tmp_new_fn.unlink()
Expand Down Expand Up @@ -149,6 +151,9 @@ async def get(self, key: str, is_json: bool = False) -> Any:
with _open_atomic(pathlib.Path(key), "rb") as f:
return ray.cloudpickle.load(f)

def open(self, key: str) -> io.BufferedIOBase:
return _open_atomic(pathlib.Path(key), "rwb")

async def delete_prefix(self, key_prefix: str) -> None:
path = pathlib.Path(key_prefix)
if path.is_dir():
Expand Down
28 changes: 27 additions & 1 deletion python/ray/workflow/storage/s3.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import asyncio
import nest_asyncio
import tempfile
import json
import urllib.parse as parse
from botocore.exceptions import ClientError
import aioboto3
import itertools
import io
import ray
from typing import Any, List
from typing import Any, Callable, List
from ray.workflow.storage.base import Storage, KeyNotFoundError
import ray.cloudpickle

nest_asyncio.apply()

MAX_RECEIVED_DATA_MEMORY_SIZE = 25 * 1024 * 1024 # 25MB


Expand Down Expand Up @@ -79,6 +84,15 @@ async def get(self, key: str, is_json: bool = False) -> Any:
else:
raise

def open(self, key: str) -> io.BufferedIOBase:
def onclose(bio):
coro = self.put(key, bio.getvalue(), is_json=False)
# NOTE: There's no easy way to `await` this put.
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)

return _BytesIOWithCallback(onclose)

async def delete_prefix(self, key_prefix: str) -> None:
async with self._session.resource(
"s3", endpoint_url=self._endpoint_url,
Expand Down Expand Up @@ -134,3 +148,15 @@ def __reduce__(self):
self._endpoint_url, self._aws_access_key_id,
self._aws_secret_access_key,
self._aws_session_token, self._config)


class _BytesIOWithCallback(io.BytesIO):
def __init__(self, callback: Callable[[io.BytesIO], None], *args,
**kwargs):
self._callback = callback

def close(self):
if self._callback:
self._callback(self)
self._callback = None
super().close()

0 comments on commit 573f4e0

Please sign in to comment.