Skip to content

Commit

Permalink
[Storage] Add progress_hook to file-share upload/download (#24997)
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Jun 29, 2022
1 parent eb4afb3 commit 155eb8b
Show file tree
Hide file tree
Showing 19 changed files with 2,868 additions and 29 deletions.
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-file-share/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 12.9.0 (Unreleased)

### Features Added
- Added support for progress tracking to `upload_file()` and `download_file()` via a new optional callback, `progress_hook`.

### Bugs Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,11 @@ def upload_file(
file.
:keyword int max_concurrency:
Maximum number of parallel connections to use.
:keyword progress_hook:
A callback to track the progress of a long running upload. The signature is
function(current: int, total: Optional[int]) where current is the number of bytes transferred
so far, and total is the size of the blob or None if the size is unknown.
:paramtype progress_hook: Callable[[int, Optional[int]], None]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:keyword str encoding:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
stream=None,
parallel=None,
validate_content=None,
progress_hook=None,
etag=None,
**kwargs
):
Expand All @@ -53,6 +54,7 @@ def __init__(
self.stream = stream
self.stream_lock = threading.Lock() if parallel else None
self.progress_lock = threading.Lock() if parallel else None
self.progress_hook = progress_hook

# For a parallel download, the stream is always seekable, so we note down the current position
# in order to seek to the right place when out-of-order chunks come in
Expand Down Expand Up @@ -97,6 +99,9 @@ def _update_progress(self, length):
else:
self.progress_total += length

if self.progress_hook:
self.progress_hook(self.progress_total, self.total_size)

def _write_to_stream(self, chunk_data, chunk_start):
if self.stream_lock:
with self.stream_lock: # pylint: disable=not-context-manager
Expand Down Expand Up @@ -227,6 +232,7 @@ def __init__(
self._max_concurrency = max_concurrency
self._encoding = encoding
self._validate_content = validate_content
self._progress_hook = kwargs.pop('progress_hook', None)
self._request_options = kwargs
self._location_mode = None
self._download_complete = False
Expand Down Expand Up @@ -446,6 +452,9 @@ def readinto(self, stream):

# Write the content to the user stream
stream.write(self._current_content)
if self._progress_hook:
self._progress_hook(len(self._current_content), self.size)

if self._download_complete:
return self.size

Expand All @@ -465,6 +474,7 @@ def readinto(self, stream):
parallel=parallel,
validate_content=self._validate_content,
use_location=self._location_mode,
progress_hook=self._progress_hook,
etag=self._etag,
**self._request_options
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _upload_file_helper(
file_last_write_time="now",
file_permission=None,
file_permission_key=None,
progress_hook=None,
**kwargs):
try:
if size is None or size < 0:
Expand Down Expand Up @@ -93,6 +94,7 @@ def _upload_file_helper(
stream=stream,
max_concurrency=max_concurrency,
validate_content=validate_content,
progress_hook=progress_hook,
timeout=timeout,
**kwargs
)
Expand Down Expand Up @@ -489,6 +491,11 @@ def upload_file(
.. versionadded:: 12.1.0
:paramtype lease: ~azure.storage.fileshare.ShareLeaseClient or str
:keyword progress_hook:
A callback to track the progress of a long running upload. The signature is
function(current: int, total: Optional[int]) where current is the number of bytes transferred
so far, and total is the size of the blob or None if the size is unknown.
:paramtype progress_hook: Callable[[int, Optional[int]], None]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:keyword str encoding:
Expand All @@ -509,6 +516,7 @@ def upload_file(
content_settings = kwargs.pop('content_settings', None)
max_concurrency = kwargs.pop('max_concurrency', 1)
validate_content = kwargs.pop('validate_content', False)
progress_hook = kwargs.pop('progress_hook', None)
timeout = kwargs.pop('timeout', None)
encoding = kwargs.pop('encoding', 'UTF-8')

Expand Down Expand Up @@ -542,6 +550,7 @@ def upload_file(
file_last_write_time=file_last_write_time,
file_permission=file_permission,
file_permission_key=permission_key,
progress_hook=progress_hook,
**kwargs)

@distributed_trace
Expand Down Expand Up @@ -740,6 +749,11 @@ def download_file(
.. versionadded:: 12.1.0
:paramtype lease: ~azure.storage.fileshare.ShareLeaseClient or str
:keyword progress_hook:
A callback to track the progress of a long running download. The signature is
function(current: int, total: int) where current is the number of bytes transferred
so far, and total is the total size of the download.
:paramtype progress_hook: Callable[[int, int], None]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:returns: A streaming object (StorageStreamDownloader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,11 @@ async def upload_file(
file.
:keyword int max_concurrency:
Maximum number of parallel connections to use.
:keyword progress_hook:
An async callback to track the progress of a long running upload. The signature is
function(current: int, total: Optional[int]) where current is the number of bytes transferred
so far, and total is the size of the blob or None if the size is unknown.
:paramtype progress_hook: Callable[[int, Optional[int]], Awaitable[None]]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:keyword str encoding:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ async def _update_progress(self, length):
else:
self.progress_total += length

if self.progress_hook:
await self.progress_hook(self.progress_total, self.total_size)

async def _write_to_stream(self, chunk_data, chunk_start):
if self.stream_lock:
async with self.stream_lock: # pylint: disable=not-async-context-manager
Expand Down Expand Up @@ -184,6 +187,7 @@ def __init__(
self._max_concurrency = max_concurrency
self._encoding = encoding
self._validate_content = validate_content
self._progress_hook = kwargs.pop('progress_hook', None)
self._request_options = kwargs
self._location_mode = None
self._download_complete = False
Expand Down Expand Up @@ -400,6 +404,9 @@ async def readinto(self, stream):

# Write the content to the user stream
stream.write(self._current_content)
if self._progress_hook:
await self._progress_hook(len(self._current_content), self.size)

if self._download_complete:
return self.size

Expand All @@ -419,6 +426,7 @@ async def readinto(self, stream):
parallel=parallel,
validate_content=self._validate_content,
use_location=self._location_mode,
progress_hook=self._progress_hook,
etag=self._etag,
**self._request_options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def _upload_file_helper(
file_last_write_time="now",
file_permission=None,
file_permission_key=None,
progress_hook=None,
**kwargs
):
try:
Expand All @@ -85,6 +86,7 @@ async def _upload_file_helper(
stream=stream,
max_concurrency=max_concurrency,
validate_content=validate_content,
progress_hook=progress_hook,
timeout=timeout,
**kwargs
)
Expand Down Expand Up @@ -360,6 +362,11 @@ async def upload_file(
.. versionadded:: 12.1.0
:paramtype lease: ~azure.storage.fileshare.aio.ShareLeaseClient or str
:keyword progress_hook:
An async callback to track the progress of a long running upload. The signature is
function(current: int, total: Optional[int]) where current is the number of bytes transferred
so far, and total is the size of the blob or None if the size is unknown.
:paramtype progress_hook: Callable[[int, Optional[int]], Awaitable[None]]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:returns: File-updated property dict (Etag and last modified).
Expand All @@ -378,6 +385,7 @@ async def upload_file(
content_settings = kwargs.pop('content_settings', None)
max_concurrency = kwargs.pop('max_concurrency', 1)
validate_content = kwargs.pop('validate_content', False)
progress_hook = kwargs.pop('progress_hook', None)
timeout = kwargs.pop('timeout', None)
encoding = kwargs.pop('encoding', 'UTF-8')

Expand Down Expand Up @@ -411,6 +419,7 @@ async def upload_file(
file_last_write_time=file_last_write_time,
file_permission=file_permission,
file_permission_key=permission_key,
progress_hook=progress_hook,
**kwargs
)

Expand Down Expand Up @@ -613,6 +622,11 @@ async def download_file(
.. versionadded:: 12.1.0
:paramtype lease: ~azure.storage.fileshare.aio.ShareLeaseClient or str
:keyword progress_hook:
An async callback to track the progress of a long running download. The signature is
function(current: int, total: int) where current is the number of bytes transferred
so far, and total is the size of the blob or None if the size is unknown.
:paramtype progress_hook: Callable[[int, int], Awaitable[None]]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:returns: A streaming object (StorageStreamDownloader)
Expand Down
Loading

0 comments on commit 155eb8b

Please sign in to comment.