Skip to content

Commit

Permalink
Add the --update option to "storage cp". (#1007)
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiy-storchaka authored Sep 2, 2019
1 parent cf7192c commit f73aafe
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.D/1007.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added the `--update` option in `neuro storage cp`. It makes the command copying the source file only when it is newer than the destination file or when the destination file is missing.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ Name | Description|
|_\--glob / --no-glob_|Expand glob patterns in SOURCES with explicit scheme \[default: True]|
|_\-t, --target-directory DIRECTORY_|Copy all SOURCES into DIRECTORY|
|_\-T, --no-target-directory_|Treat DESTINATION as a normal file|
|_\-u, --update_|Copy only when the SOURCE file is newer than the destination file or when the destination file is missing|
|_\-p, --progress / -P, --no-progress_|Show progress, on by default|
|_--help_|Show this message and exit.|

Expand Down Expand Up @@ -1680,6 +1681,7 @@ Name | Description|
|_\--glob / --no-glob_|Expand glob patterns in SOURCES with explicit scheme \[default: True]|
|_\-t, --target-directory DIRECTORY_|Copy all SOURCES into DIRECTORY|
|_\-T, --no-target-directory_|Treat DESTINATION as a normal file|
|_\-u, --update_|Copy only when the SOURCE file is newer than the destination file or when the destination file is missing|
|_\-p, --progress / -P, --no-progress_|Show progress, on by default|
|_--help_|Show this message and exit.|

Expand Down
139 changes: 125 additions & 14 deletions neuromation/api/storage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import datetime
import enum
import errno
import fnmatch
import os
import re
import time
from dataclasses import dataclass
from email.utils import parsedate
from pathlib import Path
from stat import S_ISREG
from typing import (
Any,
AsyncIterator,
Expand All @@ -18,6 +22,7 @@
Tuple,
)

import aiohttp
import attr
from yarl import URL

Expand Down Expand Up @@ -46,6 +51,7 @@

MAX_OPEN_FILES = 100
READ_SIZE = 2 ** 20 # 1 MiB
TIME_THRESHOLD = 1.0

Printer = Callable[[str], None]
ProgressQueueItem = Optional[Tuple[Callable[[Any], None], Any]]
Expand Down Expand Up @@ -80,17 +86,50 @@ def __init__(self, core: _Core, config: _Config) -> None:
self._core = core
self._config = config
self._file_sem = asyncio.BoundedSemaphore(MAX_OPEN_FILES)
self._min_time_diff = 0.0
self._max_time_diff = 0.0

def _uri_to_path(self, uri: URL) -> str:
uri = normalize_storage_path_uri(uri, self._config.auth_token.username)
prefix = uri.host + "/" if uri.host else ""
return prefix + uri.path.lstrip("/")

def _set_time_diff(self, request_time: float, resp: aiohttp.ClientResponse) -> None:
response_time = time.time()
server_timetuple = parsedate(resp.headers.get("Date"))
if not server_timetuple:
return
server_time = datetime.datetime(
*server_timetuple[:6], tzinfo=datetime.timezone.utc
).timestamp()
# Remove 1 because server time has been truncated
# and can be up to 1 second less than the actulal value
self._min_time_diff = request_time - server_time - 1.0
self._max_time_diff = response_time - server_time

def _is_local_modified(self, local: os.stat_result, remote: FileStatus) -> bool:
return (
local.st_size != remote.size
or local.st_mtime - remote.modification_time
> self._min_time_diff - TIME_THRESHOLD
)

def _is_remote_modified(self, local: os.stat_result, remote: FileStatus) -> bool:
# Add 1 because remote.modification_time has been truncated
# and can be up to 1 second less than the actulal value
return (
local.st_size != remote.size
or local.st_mtime - remote.modification_time
< self._max_time_diff + TIME_THRESHOLD + 1.0
)

async def ls(self, uri: URL) -> List[FileStatus]:
url = self._config.cluster_config.storage_url / self._uri_to_path(uri)
url = url.with_query(op="LISTSTATUS")

request_time = time.time()
async with self._core.request("GET", url) as resp:
self._set_time_diff(request_time, resp)
res = await resp.json()
return [
_file_status_from_api(status)
Expand Down Expand Up @@ -200,7 +239,9 @@ async def stat(self, uri: URL) -> FileStatus:
url = self._config.cluster_config.storage_url / self._uri_to_path(uri)
url = url.with_query(op="GETFILESTATUS")

request_time = time.time()
async with self._core.request("GET", url) as resp:
self._set_time_diff(request_time, resp)
res = await resp.json()
return _file_status_from_api(res["FileStatus"])

Expand Down Expand Up @@ -290,7 +331,12 @@ async def _iterate_file(
)

async def upload_file(
self, src: URL, dst: URL, *, progress: Optional[AbstractFileProgress] = None
self,
src: URL,
dst: URL,
*,
update: bool = False,
progress: Optional[AbstractFileProgress] = None,
) -> None:
if progress is None:
progress = _DummyProgress()
Expand All @@ -310,14 +356,14 @@ async def upload_file(
# Ignore stat errors for device files like NUL or CON on Windows.
# See https://bugs.python.org/issue37074
try:
stats = await self.stat(dst)
if stats.is_dir():
dst_stat = await self.stat(dst)
if dst_stat.is_dir():
raise IsADirectoryError(errno.EISDIR, "Is a directory", str(dst))
except ResourceNotFound:
# target doesn't exist, lookup for parent dir
try:
stats = await self.stat(dst.parent)
if not stats.is_dir():
dst_parent_stat = await self.stat(dst.parent)
if not dst_parent_stat.is_dir():
# parent path should be a folder
raise NotADirectoryError(
errno.ENOTDIR, "Not a directory", str(dst.parent)
Expand All @@ -326,6 +372,17 @@ async def upload_file(
raise NotADirectoryError(
errno.ENOTDIR, "Not a directory", str(dst.parent)
)
else:
if update:
try:
src_stat = path.stat()
except OSError:
pass
else:
if S_ISREG(src_stat.st_mode) and not self._is_local_modified(
src_stat, dst_stat
):
return
queue: "asyncio.Queue[ProgressQueueItem]" = asyncio.Queue()
await _run_progress(
queue, self._upload_file(path, dst, progress=progress, queue=queue)
Expand All @@ -348,6 +405,7 @@ async def upload_dir(
src: URL,
dst: URL,
*,
update: bool = False,
progress: Optional[AbstractRecursiveFileProgress] = None,
) -> None:
if progress is None:
Expand All @@ -361,7 +419,10 @@ async def upload_dir(
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(path))
queue: "asyncio.Queue[ProgressQueueItem]" = asyncio.Queue()
await _run_progress(
queue, self._upload_dir(src, path, dst, progress=progress, queue=queue)
queue,
self._upload_dir(
src, path, dst, update=update, progress=progress, queue=queue
),
)

async def _upload_dir(
Expand All @@ -370,22 +431,39 @@ async def _upload_dir(
src_path: Path,
dst: URL,
*,
update: bool,
progress: AbstractRecursiveFileProgress,
queue: "asyncio.Queue[ProgressQueueItem]",
) -> None:
tasks = []
try:
await self.mkdir(dst, exist_ok=True)
except neuromation.api.IllegalArgumentError:
exists = False
if update:
try:
dst_files = {
item.name: item for item in await self.ls(dst) if item.is_file()
}
exists = True
except ResourceNotFound:
update = False
if not exists:
await self.mkdir(dst, exist_ok=True)
except (FileExistsError, neuromation.api.IllegalArgumentError):
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(dst))
await queue.put((progress.enter, StorageProgressEnterDir(src, dst)))
tasks = []
async with self._file_sem:
folder = sorted(
src_path.iterdir(), key=lambda item: (item.is_dir(), item.name)
)
for child in folder:
name = child.name
if child.is_file():
if (
update
and name in dst_files
and not self._is_local_modified(child.stat(), dst_files[name])
):
continue
tasks.append(
self._upload_file(
src_path / name, dst / name, progress=progress, queue=queue
Expand All @@ -397,6 +475,7 @@ async def _upload_dir(
src / name,
src_path / name,
dst / name,
update=update,
progress=progress,
queue=queue,
)
Expand All @@ -419,21 +498,36 @@ async def _upload_dir(
await queue.put((progress.leave, StorageProgressLeaveDir(src, dst)))

async def download_file(
self, src: URL, dst: URL, *, progress: Optional[AbstractFileProgress] = None
self,
src: URL,
dst: URL,
*,
update: bool = False,
progress: Optional[AbstractFileProgress] = None,
) -> None:
if progress is None:
progress = _DummyProgress()
src = normalize_storage_path_uri(src, self._config.auth_token.username)
dst = normalize_local_path_uri(dst)
path = _extract_path(dst)
stat = await self.stat(src)
if not stat.is_file():
src_stat = await self.stat(src)
if not src_stat.is_file():
raise IsADirectoryError(errno.EISDIR, "Is a directory", str(src))
if update:
try:
dst_stat = path.stat()
except OSError:
pass
else:
if S_ISREG(dst_stat.st_mode) and not self._is_remote_modified(
dst_stat, src_stat
):
return
queue: "asyncio.Queue[ProgressQueueItem]" = asyncio.Queue()
await _run_progress(
queue,
self._download_file(
src, dst, path, stat.size, progress=progress, queue=queue
src, dst, path, src_stat.size, progress=progress, queue=queue
),
)

Expand Down Expand Up @@ -467,6 +561,7 @@ async def download_dir(
src: URL,
dst: URL,
*,
update: bool = False,
progress: Optional[AbstractRecursiveFileProgress] = None,
) -> None:
if progress is None:
Expand All @@ -476,7 +571,10 @@ async def download_dir(
path = _extract_path(dst)
queue: "asyncio.Queue[ProgressQueueItem]" = asyncio.Queue()
await _run_progress(
queue, self._download_dir(src, dst, path, progress=progress, queue=queue)
queue,
self._download_dir(
src, dst, path, update=update, progress=progress, queue=queue
),
)

async def _download_dir(
Expand All @@ -485,16 +583,28 @@ async def _download_dir(
dst: URL,
dst_path: Path,
*,
update: bool,
progress: AbstractRecursiveFileProgress,
queue: "asyncio.Queue[ProgressQueueItem]",
) -> None:
dst_path.mkdir(parents=True, exist_ok=True)
await queue.put((progress.enter, StorageProgressEnterDir(src, dst)))
tasks = []
if update:
async with self._file_sem:
dst_files = {
item.name: item for item in dst_path.iterdir() if item.is_file()
}
folder = sorted(await self.ls(src), key=lambda item: (item.is_dir(), item.name))
for child in folder:
name = child.name
if child.is_file():
if (
update
and name in dst_files
and not self._is_remote_modified(dst_files[name].stat(), child)
):
continue
tasks.append(
self._download_file(
src / name,
Expand All @@ -511,6 +621,7 @@ async def _download_dir(
src / name,
dst / name,
dst_path / name,
update=update,
progress=progress,
queue=queue,
)
Expand Down
24 changes: 20 additions & 4 deletions neuromation/cli/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ async def glob(root: Root, patterns: Sequence[str]) -> None:
is_flag=True,
help="Treat DESTINATION as a normal file",
)
@click.option(
"-u",
"--update",
is_flag=True,
help="Copy only when the SOURCE file is newer than the destination file "
"or when the destination file is missing",
)
@click.option(
"-p/-P",
"--progress/--no-progress",
Expand All @@ -210,6 +217,7 @@ async def cp(
glob: bool,
target_directory: Optional[str],
no_target_directory: bool,
update: bool,
progress: bool,
) -> None:
"""
Expand Down Expand Up @@ -293,14 +301,22 @@ async def cp(

if src.scheme == "file" and dst.scheme == "storage":
if recursive:
await root.client.storage.upload_dir(src, dst, progress=progress_obj)
await root.client.storage.upload_dir(
src, dst, update=update, progress=progress_obj
)
else:
await root.client.storage.upload_file(src, dst, progress=progress_obj)
await root.client.storage.upload_file(
src, dst, update=update, progress=progress_obj
)
elif src.scheme == "storage" and dst.scheme == "file":
if recursive:
await root.client.storage.download_dir(src, dst, progress=progress_obj)
await root.client.storage.download_dir(
src, dst, update=update, progress=progress_obj
)
else:
await root.client.storage.download_file(src, dst, progress=progress_obj)
await root.client.storage.download_file(
src, dst, update=update, progress=progress_obj
)
else:
raise RuntimeError(
f"Copy operation of the file with scheme '{src.scheme}'"
Expand Down
Loading

0 comments on commit f73aafe

Please sign in to comment.