Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up uploading/downloading of file and directories. #958

Merged
merged 2 commits into from
Aug 15, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 65 additions & 26 deletions neuromation/api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import attr
from yarl import URL

import neuromation

from .abc import (
AbstractFileProgress,
AbstractRecursiveFileProgress,
Expand Down Expand Up @@ -296,7 +298,12 @@ async def upload_file(
raise NotADirectoryError(
errno.ENOTDIR, "Not a directory", str(dst.parent)
)
await self.create(dst, self._iterate_file(path, dst, progress=progress))
await self._upload_file(path, dst, progress=progress)

async def _upload_file(
self, src_path: Path, dst: URL, *, progress: AbstractFileProgress
) -> None:
await self.create(dst, self._iterate_file(src_path, dst, progress=progress))

async def upload_dir(
self,
Expand All @@ -314,31 +321,38 @@ async def upload_dir(
raise FileNotFoundError(errno.ENOENT, "No such file", str(path))
if not path.is_dir():
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(path))
await self._upload_dir(src, path, dst, progress=progress)

async def _upload_dir(
self,
src: URL,
src_path: Path,
dst: URL,
*,
progress: AbstractRecursiveFileProgress,
) -> None:
try:
stat = await self.stats(dst)
if not stat.is_dir():
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(dst))
except ResourceNotFound:
await self.mkdirs(dst)
await self.mkdirs(dst, exist_ok=True)
except neuromation.api.IllegalArgumentError:
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(dst))
progress.enter(StorageProgressEnterDir(src, dst))
folder = sorted(path.iterdir(), key=lambda item: (item.is_dir(), item.name))
folder = sorted(src_path.iterdir(), key=lambda item: (item.is_dir(), item.name))
for child in folder:
name = child.name
if child.is_file():
await self.upload_file(
src / child.name, dst / child.name, progress=progress
)
await self._upload_file(src_path / name, dst / name, progress=progress)
elif child.is_dir():
await self.upload_dir(
src / child.name, dst / child.name, progress=progress
await self._upload_dir(
src / name, src_path / name, dst / name, progress=progress
)
else:
# This case is for uploading non-regular file,
# e.g. blocking device or unix socket
# Coverage temporary skipped, the line is waiting for a champion
progress.fail(
StorageProgressFail(
src / child.name,
dst / child.name,
src / name,
dst / name,
f"Cannot upload {child}, not regular file/directory",
)
) # pragma: no cover
Expand All @@ -352,12 +366,22 @@ async def download_file(
src = normalize_storage_path_uri(src, self._config.auth_token.username)
dst = normalize_local_path_uri(dst)
path = _extract_path(dst)
stat = await self.stats(src)
if not stat.is_file():
raise IsADirectoryError(errno.EISDIR, "Is a directory", str(src))
await self._download_file(src, dst, path, stat.size, progress=progress)

async def _download_file(
self,
src: URL,
dst: URL,
dst_path: Path,
size: int,
*,
progress: AbstractFileProgress,
) -> None:
loop = asyncio.get_event_loop()
with path.open("wb") as stream:
stat = await self.stats(src)
if not stat.is_file():
raise IsADirectoryError(errno.EISDIR, "Is a directory", str(src))
size = stat.size
with dst_path.open("wb") as stream:
progress.start(StorageProgressStart(src, dst, size))
pos = 0
async for chunk in self.open(src):
Expand All @@ -378,23 +402,38 @@ async def download_dir(
src = normalize_storage_path_uri(src, self._config.auth_token.username)
dst = normalize_local_path_uri(dst)
path = _extract_path(dst)
path.mkdir(parents=True, exist_ok=True)
await self._download_dir(src, dst, path, progress=progress)

async def _download_dir(
self,
src: URL,
dst: URL,
dst_path: Path,
*,
progress: AbstractRecursiveFileProgress,
) -> None:
dst_path.mkdir(parents=True, exist_ok=True)
progress.enter(StorageProgressEnterDir(src, dst))
folder = sorted(await self.ls(src), key=lambda item: (item.is_dir(), item.name))
for child in folder:
name = child.name
if child.is_file():
await self.download_file(
src / child.name, dst / child.name, progress=progress
await self._download_file(
src / name,
dst / name,
dst_path / name,
child.size,
progress=progress,
)
elif child.is_dir():
await self.download_dir(
src / child.name, dst / child.name, progress=progress
await self._download_dir(
src / name, dst / name, dst_path / name, progress=progress
)
else:
progress.fail(
StorageProgressFail(
src / child.name,
dst / child.name,
src / name,
dst / name,
f"Cannot download {child}, not regular file/directory",
)
) # pragma: no cover
Expand Down