-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry storage operations in case of ClientError. #1107
Changes from 2 commits
f43f4ef
b95ce53
17bd55f
10d2d72
1caaf90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Retry storage operations in case of some errors. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
import asyncio | ||
import contextlib | ||
import datetime | ||
import enum | ||
import errno | ||
import fnmatch | ||
import logging | ||
import os | ||
import re | ||
import time | ||
|
@@ -15,8 +17,10 @@ | |
AsyncIterator, | ||
Awaitable, | ||
Callable, | ||
ContextManager, | ||
Dict, | ||
Iterable, | ||
Iterator, | ||
List, | ||
Optional, | ||
Tuple, | ||
|
@@ -52,6 +56,9 @@ | |
MAX_OPEN_FILES = 100 | ||
READ_SIZE = 2 ** 20 # 1 MiB | ||
TIME_THRESHOLD = 1.0 | ||
ATTEMPTS = 10 | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
Printer = Callable[[str], None] | ||
ProgressQueueItem = Optional[Tuple[Callable[[Any], None], Any]] | ||
|
@@ -384,9 +391,13 @@ async def _upload_file( | |
progress: AbstractFileProgress, | ||
queue: "asyncio.Queue[ProgressQueueItem]", | ||
) -> None: | ||
await self.create( | ||
dst, self._iterate_file(src_path, dst, progress=progress, queue=queue) | ||
) | ||
for retry in retries(f"Fail to upload {dst}"): | ||
with retry: | ||
await self.create( | ||
dst, | ||
self._iterate_file(src_path, dst, progress=progress, queue=queue), | ||
) | ||
break | ||
|
||
async def upload_dir( | ||
self, | ||
|
@@ -428,14 +439,22 @@ async def _upload_dir( | |
exists = False | ||
if update: | ||
try: | ||
dst_files = { | ||
item.name: item for item in await self.ls(dst) if item.is_file() | ||
} | ||
for retry in retries(f"Fail to list {dst}"): | ||
with retry: | ||
dst_files = { | ||
item.name: item | ||
for item in await self.ls(dst) | ||
if item.is_file() | ||
} | ||
break | ||
exists = True | ||
except ResourceNotFound: | ||
update = False | ||
if not exists: | ||
await self.mkdir(dst, exist_ok=True) | ||
for retry in retries(f"Fail to create {dst}"): | ||
with retry: | ||
await self.mkdir(dst, exist_ok=True) | ||
break | ||
except (FileExistsError, neuromation.api.IllegalArgumentError): | ||
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(dst)) | ||
await queue.put((progress.enter, StorageProgressEnterDir(src, dst))) | ||
|
@@ -532,13 +551,19 @@ async def _download_file( | |
async with self._file_sem: | ||
with dst_path.open("wb") as stream: | ||
await queue.put((progress.start, StorageProgressStart(src, dst, size))) | ||
pos = 0 | ||
async for chunk in self.open(src): | ||
pos += len(chunk) | ||
await queue.put( | ||
(progress.step, StorageProgressStep(src, dst, pos, size)) | ||
) | ||
await loop.run_in_executor(None, stream.write, chunk) | ||
for retry in retries(f"Fail to download {src}"): | ||
with retry: | ||
pos = 0 | ||
async for chunk in self.open(src): | ||
pos += len(chunk) | ||
await queue.put( | ||
( | ||
progress.step, | ||
StorageProgressStep(src, dst, pos, size), | ||
) | ||
) | ||
await loop.run_in_executor(None, stream.write, chunk) | ||
break | ||
await queue.put( | ||
(progress.complete, StorageProgressComplete(src, dst, size)) | ||
) | ||
|
@@ -586,7 +611,12 @@ async def _download_dir( | |
item.name: item for item in dst_path.iterdir() if item.is_file() | ||
}, | ||
) | ||
folder = await self.ls(src) | ||
|
||
for retry in retries(f"Fail to list {src}"): | ||
with retry: | ||
folder = await self.ls(src) | ||
break | ||
|
||
for child in folder: | ||
name = child.name | ||
if child.is_file(): | ||
|
@@ -713,3 +743,22 @@ def leave(self, data: StorageProgressLeaveDir) -> None: | |
|
||
def fail(self, data: StorageProgressFail) -> None: | ||
pass | ||
|
||
|
||
def retries(msg: str) -> Iterator[ContextManager[None]]: | ||
sleeptime = 0.0 | ||
for r in range(ATTEMPTS)[::-1]: | ||
if r: | ||
|
||
@contextlib.contextmanager | ||
def retry() -> Iterator[None]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the function should be async? It would be nice to have a test for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. There is |
||
try: | ||
yield | ||
except aiohttp.ClientError as err: | ||
log.debug(f"{msg}: {err}. Retry...") | ||
asyncio.sleep(sleeptime) | ||
|
||
sleeptime += 0.1 | ||
yield retry() | ||
else: | ||
yield contextlib.ExitStack() # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is reversed iteration here?
I believe that the thing may be implemented simpler:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!