Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read method to StorageStreamDownloader #24275

Merged
merged 16 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 12.14.0b1 (Unreleased)

### Features Added
- Added standard `read` method to `StorageStreamDownloader`.
- Added support for async streams (classes with an async `read` method) to async `upload_blob`.

### Bugs Fixed
- Adjusted type hints for `upload_blob` and `StorageStreamDownloader.readall`.
Expand Down
66 changes: 65 additions & 1 deletion sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import time
import warnings
from io import BytesIO
from typing import Generic, Iterator, TypeVar
from typing import Generic, Iterator, Optional, TypeVar

import requests
from azure.core.exceptions import HttpResponseError, ServiceResponseError
Expand Down Expand Up @@ -335,6 +335,7 @@ def __init__(
self._non_empty_ranges = None
self._response = None
self._encryption_data = None
self._offset = 0
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved

# The cls is passed in via download_cls to avoid conflicting arg name with Generic.__new__
# but needs to be changed to cls in the request options.
Expand Down Expand Up @@ -552,6 +553,69 @@ def chunks(self):
downloader=iter_downloader,
chunk_size=self._config.max_chunk_get_size)

def read(self, size: Optional[int] = -1) -> T:
"""
Read up to size bytes from the object and return them. If size
is specified as -1, all bytes will be read.
"""
if size == -1:
return self.readall()
if size == 0 or self.size == 0:
data = b''
if self._encoding:
return data.decode(self._encoding)
return data

stream = BytesIO()
remaining_size = size

# Start by reading from current_content if there is data left
if self._offset < len(self._current_content):
start = self._offset
end = min(remaining_size, len(self._current_content) - self._offset)
read = stream.write(self._current_content[start:end])

remaining_size -= read
self._offset += read

if remaining_size > 0:
end_range = min(self._offset + remaining_size, self.size)
parallel = self._max_concurrency > 1
downloader = _ChunkDownloader(
client=self._clients.blob,
non_empty_ranges=self._non_empty_ranges,
total_size=remaining_size,
chunk_size=self._config.max_chunk_get_size,
current_progress=self._offset,
start_range=self._offset,
end_range=end_range,
stream=stream,
parallel=parallel,
validate_content=self._validate_content,
encryption_options=self._encryption_options,
encryption_data=self._encryption_data,
use_location=self._location_mode,
**self._request_options
)

if parallel:
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(self._max_concurrency) as executor:
list(executor.map(
with_current_context(downloader.process_chunk),
downloader.get_chunk_offsets()
))
else:
for chunk in downloader.get_chunk_offsets():
downloader.process_chunk(chunk)

self._offset += remaining_size

data = stream.getvalue()
if self._encoding:
return data.decode(self._encoding)
return data

def readall(self):
# type: () -> T
"""Download the contents of this blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@
from .uploads import SubStream, IterStreamer # pylint: disable=unused-import


async def _async_parallel_uploads(uploader, pending, running):
range_ids = []
while True:
# Wait for some download to finish before adding a new one
done, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED)
range_ids.extend([chunk.result() for chunk in done])
try:
for _ in range(0, len(done)):
next_chunk = await pending.__anext__()
running.add(asyncio.ensure_future(uploader(next_chunk)))
except StopAsyncIteration:
break

# Wait for the remaining uploads to finish
if running:
done, _running = await asyncio.wait(running)
range_ids.extend([chunk.result() for chunk in done])
annatisch marked this conversation as resolved.
Show resolved Hide resolved
return range_ids


async def _parallel_uploads(uploader, pending, running):
range_ids = []
while True:
Expand Down Expand Up @@ -65,14 +85,18 @@ async def upload_data_chunks(

if parallel:
upload_tasks = uploader.get_chunk_streams()
running_futures = [
asyncio.ensure_future(uploader.process_chunk(u))
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = await _parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
running_futures = []
for _ in range(max_concurrency):
try:
chunk = await upload_tasks.__anext__()
running_futures.append(asyncio.ensure_future(uploader.process_chunk(chunk)))
except StopAsyncIteration:
break

range_ids = await _async_parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
else:
range_ids = []
for chunk in uploader.get_chunk_streams():
async for chunk in uploader.get_chunk_streams():
range_ids.append(await uploader.process_chunk(chunk))

if any(range_ids):
Expand Down Expand Up @@ -152,7 +176,7 @@ def __init__(
self.last_modified = None
self.request_options = kwargs

def get_chunk_streams(self):
async def get_chunk_streams(self):
index = 0
while True:
data = b''
Expand All @@ -162,7 +186,10 @@ def get_chunk_streams(self):
while True:
if self.total_size:
read_size = min(self.chunk_size - len(data), self.total_size - (index + len(data)))
temp = self.stream.read(read_size)
if asyncio.iscoroutinefunction(self.stream.read):
temp = await self.stream.read(read_size)
else:
temp = self.stream.read(read_size)
if not isinstance(temp, six.binary_type):
raise TypeError('Blob data should be of type bytes.')
data += temp or b""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import warnings
from io import BytesIO
from itertools import islice
from typing import AsyncIterator, Generic, TypeVar
from typing import AsyncIterator, Generic, Optional, TypeVar

import asyncio
from aiohttp import ClientPayloadError
Expand Down Expand Up @@ -243,6 +243,7 @@ def __init__(
self._non_empty_ranges = None
self._response = None
self._encryption_data = None
self._offset = 0

self._initial_range = None
self._initial_offset = None
Expand Down Expand Up @@ -456,6 +457,88 @@ def chunks(self):
downloader=iter_downloader,
chunk_size=self._config.max_chunk_get_size)

async def read(self, size: Optional[int] = -1) -> T:
"""
Read up to size bytes from the object and return them. If size
is specified as -1, all bytes will be read.
"""
if size == -1:
return await self.readall()
if size == 0 or self.size == 0:
data = b''
if self._encoding:
return data.decode(self._encoding)
return data

stream = BytesIO()
remaining_size = size

# Start by reading from current_content if there is data left
if self._offset < len(self._current_content):
start = self._offset
end = min(remaining_size, len(self._current_content) - self._offset)
read = stream.write(self._current_content[start:end])

remaining_size -= read
self._offset += read

if remaining_size > 0:
end_range = min(self._offset + remaining_size, self.size)
parallel = self._max_concurrency > 1
downloader = _AsyncChunkDownloader(
client=self._clients.blob,
non_empty_ranges=self._non_empty_ranges,
total_size=remaining_size,
chunk_size=self._config.max_chunk_get_size,
current_progress=self._offset,
start_range=self._offset,
end_range=end_range,
stream=stream,
parallel=parallel,
validate_content=self._validate_content,
encryption_options=self._encryption_options,
encryption_data=self._encryption_data,
use_location=self._location_mode,
**self._request_options
)

dl_tasks = downloader.get_chunk_offsets()
running_futures = [
asyncio.ensure_future(downloader.process_chunk(d))
for d in islice(dl_tasks, 0, self._max_concurrency)
]
while running_futures:
# Wait for some download to finish before adding a new one
done, running_futures = await asyncio.wait(
running_futures, return_when=asyncio.FIRST_COMPLETED)
try:
for task in done:
task.result()
except HttpResponseError as error:
process_storage_error(error)
try:
next_chunk = next(dl_tasks)
except StopIteration:
break
else:
running_futures.add(asyncio.ensure_future(downloader.process_chunk(next_chunk)))

if running_futures:
# Wait for the remaining downloads to finish
done, _running_futures = await asyncio.wait(running_futures)
try:
for task in done:
task.result()
except HttpResponseError as error:
process_storage_error(error)

self._offset += remaining_size

data = stream.getvalue()
if self._encoding:
return data.decode(self._encoding)
return data

async def readall(self):
# type: () -> T
"""Download the contents of this blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# --------------------------------------------------------------------------
# pylint: disable=no-self-use

import asyncio
from io import SEEK_SET, UnsupportedOperation
from typing import TypeVar, TYPE_CHECKING

Expand Down Expand Up @@ -70,7 +71,10 @@ async def upload_block_blob( # pylint: disable=too-many-locals, too-many-statem
# Do single put if the size is smaller than config.max_single_put_size
if adjusted_count is not None and (adjusted_count <= blob_settings.max_single_put_size):
try:
data = data.read(length)
if asyncio.iscoroutinefunction(data.read):
data = await data.read(length)
else:
data = data.read(length)
if not isinstance(data, six.binary_type):
raise TypeError('Blob data should be of type bytes.')
except AttributeError:
Expand Down
Loading