Skip to content

Commit

Permalink
Use partial read when retry to download blob (#1865)
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiy-storchaka authored Nov 25, 2020
1 parent 3ff7b47 commit b68e108
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.D/1865.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`neuro blob cp` uses now partial read when retry to download a blob.
10 changes: 8 additions & 2 deletions docs/blob_storage_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ Blob Storage
:raises: :exc:`FileNotFound` if key does not exist *or* you don't have access
to it.

.. comethod:: get_blob(bucket_name: str, key: str) -> Blob
.. comethod:: get_blob(bucket_name: str, key: str, offset: int = 0, size: Optional[int] = None) -> Blob
:async-with:

Look up the blob and return it's metadata with body content.

:param str bucket_name: Name of the bucket.
:param str key: Key of the blob.
:param int offset: Position in blob from which to read.
:param int size: Maximal size of the read data. If ``None`` read to the end of the blob.

:return: :class:`Blob` object. Please note, that ``body_stream``'s lifetime is
bounded to the asynchronous context manager. Accessing the attribute outside
Expand All @@ -114,7 +116,7 @@ Blob Storage
:raises: :exc:`FileNotFound` if key does not exist *or* you don't have access
to it.

.. comethod:: fetch_blob(bucket_name: str, key: str) -> AsyncIterator[bytes]
.. comethod:: fetch_blob(bucket_name: str, key: str, offset: int = 0, size: Optional[int] = None) -> AsyncIterator[bytes]
:async-for:

Look up the blob and return it's body content only. The content will be streamed
Expand All @@ -125,6 +127,10 @@ Blob Storage

:param str bucket_name: Name of the bucket.
:param str key: Key of the blob.
:param int offset: Position in blob from which to read.
:param int size: Maximal size of the read data. If ``None`` read to the end of the blob.

:return: asynchronous iterator used for retrieving the file content.

:raises: :exc:`FileNotFound` if key does not exist *or* you don't have access
to it.
Expand Down
3 changes: 1 addition & 2 deletions docs/storage_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ Storage

:param int size: Maximal size of the read data. If ``None`` read to the end of the file.

:param ~typing.AsyncIterator[bytes] data: asynchronous iterator used for
retrieving the file content.
:return: asynchronous iterator used for retrieving the file content.

.. rubric:: Copy operations

Expand Down
59 changes: 49 additions & 10 deletions neuromation/api/blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@
from .core import _Core
from .errors import ResourceNotFound
from .file_filter import FileFilter, translate
from .storage import _always, _has_magic, _magic_check, run_concurrently, run_progress
from .storage import (
_always,
_has_magic,
_magic_check,
_parse_content_range,
run_concurrently,
run_progress,
)
from .url_utils import _extract_path, normalize_blob_path_uri, normalize_local_path_uri
from .users import Action
from .utils import NoPublicConstructor, queue_calls, retries
Expand Down Expand Up @@ -337,19 +344,47 @@ async def head_blob(self, bucket_name: str, key: str) -> BlobListing:
return _blob_status_from_response(bucket_name, key, resp)

@asynccontextmanager
async def get_blob(self, bucket_name: str, key: str) -> AsyncIterator[Blob]:
async def get_blob(
self, bucket_name: str, key: str, offset: int = 0, size: Optional[int] = None
) -> AsyncIterator[Blob]:
"""Return blob status and body stream of the blob"""
url = self._config.blob_storage_url / "o" / bucket_name / key
auth = await self._config._api_auth()

timeout = attr.evolve(self._core.timeout, sock_read=None)
async with self._core.request("GET", url, timeout=timeout, auth=auth) as resp:
if offset < 0:
raise ValueError("offset should be >= 0")
if size is None:
if offset:
partial = True
headers = {"Range": f"bytes={offset}-"}
else:
partial = False
headers = {}
elif size > 0:
partial = True
headers = {"Range": f"bytes={offset}-{offset + size - 1}"}
else:
raise ValueError("size should be > 0")

async with self._core.request(
"GET", url, timeout=timeout, auth=auth, headers=headers
) as resp:
if partial:
if resp.status != aiohttp.web.HTTPPartialContent.status_code:
raise RuntimeError(f"Unexpected status code {resp.status}")
rng = _parse_content_range(resp.headers.get("Content-Range"))
if rng.start != offset:
raise RuntimeError("Invalid header Content-Range")

stats = _blob_status_from_response(bucket_name, key, resp)
yield Blob(resp, stats)

async def fetch_blob(self, bucket_name: str, key: str) -> AsyncIterator[bytes]:
async def fetch_blob(
self, bucket_name: str, key: str, offset: int = 0, size: Optional[int] = None
) -> AsyncIterator[bytes]:
"""Return only bytes data of the blob"""
async with self.get_blob(bucket_name, key) as blob:
async with self.get_blob(bucket_name, key, offset=offset, size=size) as blob:
async for data in blob.body_stream.iter_any():
yield data

Expand Down Expand Up @@ -673,19 +708,23 @@ async def _download_file(
loop = asyncio.get_event_loop()
async with self._file_sem:
await progress.start(StorageProgressStart(src, dst, size))
for retry in retries(f"Fail to download {src}"):
async with retry:
with dst_path.open("wb") as stream:
pos = 0
with dst_path.open("wb") as stream:
for retry in retries(f"Fail to download {src}"):
pos = stream.tell()
if pos >= size:
break
async with retry:
bucket_name, key = self._extract_bucket_and_key(src)
async for chunk in self.fetch_blob(
bucket_name=bucket_name, key=key
bucket_name=bucket_name, key=key, offset=pos
):
pos += len(chunk)
await progress.step(
StorageProgressStep(src, dst, pos, size)
)
await loop.run_in_executor(None, stream.write, chunk)
if chunk:
retry.reset()
await progress.complete(StorageProgressComplete(src, dst, size))

async def download_dir(
Expand Down
105 changes: 103 additions & 2 deletions tests/api/test_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,29 @@ async def get_blob(request: web.Request) -> web.StreamResponse:
raise web.HTTPNotFound()
blob = CONTENTS[key]

rng = request.http_range
content = blob["body"]
resp = web.StreamResponse(status=200)
etag = hashlib.md5(blob["body"]).hexdigest()
resp.headers.update({"ETag": repr(etag)})
resp.last_modified = blob["last_modified"]
resp.content_length = len(blob["body"])
start, stop, _ = rng.indices(len(content))
if not (rng.start is rng.stop is None):
if start >= stop:
raise RuntimeError
resp.set_status(web.HTTPPartialContent.status_code)
resp.headers["Content-Range"] = f"bytes {start}-{stop-1}/{len(content)}"
resp.content_length = stop - start
resp.content_type = "application/octet-stream"
await resp.prepare(request)
if request.method != "HEAD":
await resp.write(blob["body"])
chunk_size = max(len(content) * 2 // 5, 200)
if stop - start > chunk_size:
await resp.write(content[start : start + chunk_size])
raise RuntimeError
else:
await resp.write(content[start:stop])
await resp.write_eof()
return resp

async def put_blob(request: web.Request) -> web.Response:
Expand Down Expand Up @@ -584,6 +598,93 @@ async def handler(request: web.Request) -> web.StreamResponse:
assert buf == body


async def test_blob_storage_fetch_blob_partial_read(
aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
bucket_name = "foo"
key = "text.txt"
mtime1 = datetime.now()
body = b"ababahalamaha"

async def handler(request: web.Request) -> web.StreamResponse:
assert "b3" in request.headers
assert request.path == f"/blob/o/{bucket_name}/{key}"
assert request.match_info == {"bucket": bucket_name, "path": key}
rng = request.http_range
start, stop, _ = rng.indices(len(body))
resp = web.StreamResponse(status=web.HTTPPartialContent.status_code)
resp.headers["ETag"] = '"12312908asd"'
resp.headers["Content-Range"] = f"bytes {start}-{stop-1}/{len(body)}"
resp.last_modified = mtime1
resp.content_length = stop - start
resp.content_type = "plain/text"
await resp.prepare(request)
await resp.write(body[start:stop])
return resp

app = web.Application()
app.router.add_get(BlobUrlRotes.GET_OBJECT, handler)

srv = await aiohttp_server(app)

async with make_client(srv.make_url("/")) as client:
buf = bytearray()
async for chunk in client.blob_storage.fetch_blob(bucket_name, key, 5):
buf.extend(chunk)
assert buf == b"halamaha"

buf = bytearray()
async for chunk in client.blob_storage.fetch_blob(bucket_name, key, 5, 4):
buf.extend(chunk)
assert buf == b"hala"

buf = bytearray()
async for chunk in client.blob_storage.fetch_blob(bucket_name, key, 5, 20):
buf.extend(chunk)
assert buf == b"halamaha"

with pytest.raises(ValueError):
async for chunk in client.blob_storage.fetch_blob(bucket_name, key, 5, 0):
pass


async def test_blob_storage_fetch_blob_unsupported_partial_read(
aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
bucket_name = "foo"
key = "text.txt"
mtime1 = datetime.now()
body = b"ababahalamaha"

async def handler(request: web.Request) -> web.StreamResponse:
assert "b3" in request.headers
assert request.path == f"/blob/o/{bucket_name}/{key}"
assert request.match_info == {"bucket": bucket_name, "path": key}
resp = web.StreamResponse(status=200)
resp.headers.update({"ETag": '"12312908asd"'})
resp.last_modified = mtime1
resp.content_length = len(body)
resp.content_type = "plain/text"
await resp.prepare(request)
await resp.write(body)
return resp

app = web.Application()
app.router.add_get(BlobUrlRotes.GET_OBJECT, handler)

srv = await aiohttp_server(app)

async with make_client(srv.make_url("/")) as client:
buf = b""
async for data in client.blob_storage.fetch_blob(bucket_name, key):
buf += data
assert buf == body

with pytest.raises(RuntimeError):
async for data in client.blob_storage.fetch_blob(bucket_name, key, 5):
pass


async def test_blob_storage_put_blob(
aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
Expand Down

0 comments on commit b68e108

Please sign in to comment.