Skip to content

Commit

Permalink
[storage] Fix for Files upload return type (#6772)
Browse files Browse the repository at this point in the history
* Fix missing import

* Fix for Files inconsistent return types

* Blob async trace decorators
  • Loading branch information
annatisch authored Aug 20, 2019
1 parent e29cb91 commit 69f6de5
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,15 @@ class FileChunkUploader(_ChunkUploader): # pylint: disable=abstract-method

def _upload_chunk(self, chunk_offset, chunk_data):
chunk_end = chunk_offset + len(chunk_data) - 1
self.service.upload_range(
response = self.service.upload_range(
chunk_data,
chunk_offset,
chunk_end,
data_stream_total=self.total_size,
upload_stream_current=self.progress_total,
**self.request_options
)
return 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
return 'bytes={0}-{1}'.format(chunk_offset, chunk_end), response


class SubStream(IOBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class FileChunkUploader(_ChunkUploader): # pylint: disable=abstract-method

async def _upload_chunk(self, chunk_offset, chunk_data):
chunk_end = chunk_offset + len(chunk_data) - 1
await self.service.upload_range(
response = await self.service.upload_range(
chunk_data,
chunk_offset,
chunk_end,
Expand All @@ -347,4 +347,4 @@ async def _upload_chunk(self, chunk_offset, chunk_data):
**self.request_options
)
range_id = 'bytes={0}-{1}'.format(chunk_offset, chunk_end)
return range_id
return range_id, response
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
TYPE_CHECKING
)

from azure.core.tracing.decorator_async import distributed_trace_async

from .._shared.base_client_async import AsyncStorageAccountHostsMixin
from .._shared.policies_async import ExponentialRetry
from .._shared.downloads_async import StorageStreamDownloader
Expand Down Expand Up @@ -114,6 +116,7 @@ def __init__(
self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline, loop=loop)
self._loop = loop

@distributed_trace_async
async def get_account_information(self, **kwargs): # type: ignore
# type: (Optional[int]) -> Dict[str, str]
"""Gets information related to the storage account in which the blob resides.
Expand All @@ -129,6 +132,7 @@ async def get_account_information(self, **kwargs): # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def upload_blob(
self, data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
blob_type=BlobType.BlockBlob, # type: Union[str, BlobType]
Expand Down Expand Up @@ -240,6 +244,7 @@ async def upload_blob(
return await upload_page_blob(**options)
return await upload_append_blob(**options)

@distributed_trace_async
async def download_blob(self, offset=None, length=None, validate_content=False, **kwargs):
# type: (Optional[int], Optional[int], bool, Any) -> Iterable[bytes]
"""Downloads a blob to a stream with automatic chunking.
Expand Down Expand Up @@ -312,6 +317,7 @@ async def download_blob(self, offset=None, length=None, validate_content=False,
await downloader.setup(extra_properties=extra_properties)
return downloader

@distributed_trace_async
async def delete_blob(self, delete_snapshots=False, **kwargs):
# type: (bool, Any) -> None
"""Marks the specified blob for deletion.
Expand Down Expand Up @@ -374,6 +380,7 @@ async def delete_blob(self, delete_snapshots=False, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def undelete_blob(self, **kwargs):
# type: (Any) -> None
"""Restores soft-deleted blobs or snapshots.
Expand All @@ -398,6 +405,7 @@ async def undelete_blob(self, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def get_blob_properties(self, **kwargs):
# type: (Any) -> BlobProperties
"""Returns all user-defined metadata, standard HTTP properties, and
Expand Down Expand Up @@ -461,6 +469,7 @@ async def get_blob_properties(self, **kwargs):
blob_props.container = self.container_name
return blob_props # type: ignore

@distributed_trace_async
async def set_http_headers(self, content_settings=None, **kwargs):
# type: (Optional[ContentSettings], Any) -> None
"""Sets system properties on the blob.
Expand Down Expand Up @@ -505,6 +514,7 @@ async def set_http_headers(self, content_settings=None, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def set_blob_metadata(self, metadata=None, **kwargs):
# type: (Optional[Dict[str, str]], Any) -> Dict[str, Union[str, datetime]]
"""Sets user-defined metadata for the blob as one or more name-value pairs.
Expand Down Expand Up @@ -549,6 +559,7 @@ async def set_blob_metadata(self, metadata=None, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def create_page_blob( # type: ignore
self, size, # type: int
content_settings=None, # type: Optional[ContentSettings]
Expand Down Expand Up @@ -619,6 +630,7 @@ async def create_page_blob( # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def create_append_blob(self, content_settings=None, metadata=None, **kwargs):
# type: (Optional[ContentSettings], Optional[Dict[str, str]], Any) -> Dict[str, Union[str, datetime]]
"""Creates a new Append Blob.
Expand Down Expand Up @@ -667,6 +679,7 @@ async def create_append_blob(self, content_settings=None, metadata=None, **kwarg
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def create_snapshot(self, metadata=None, **kwargs):
# type: (Optional[Dict[str, str]], Any) -> Dict[str, Union[str, datetime]]
"""Creates a snapshot of the blob.
Expand Down Expand Up @@ -726,6 +739,7 @@ async def create_snapshot(self, metadata=None, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def start_copy_from_url(self, source_url, metadata=None, incremental_copy=False, **kwargs):
# type: (str, Optional[Dict[str, str]], bool, Any) -> Any
"""Copies a blob asynchronously.
Expand Down Expand Up @@ -877,6 +891,7 @@ async def start_copy_from_url(self, source_url, metadata=None, incremental_copy=
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def abort_copy(self, copy_id, **kwargs):
# type: (Union[str, BlobProperties], Any) -> None
"""Abort an ongoing copy operation.
Expand Down Expand Up @@ -904,6 +919,7 @@ async def abort_copy(self, copy_id, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs):
# type: (int, Optional[str], Optional[int], Any) -> LeaseClient
"""Requests a new lease.
Expand Down Expand Up @@ -958,6 +974,7 @@ async def acquire_lease(self, lease_duration=-1, lease_id=None, **kwargs):
await lease.acquire(lease_duration=lease_duration, **kwargs)
return lease

@distributed_trace_async
async def set_standard_blob_tier(self, standard_blob_tier, **kwargs):
# type: (Union[str, StandardBlobTier], Any) -> None
"""This operation sets the tier on a block blob.
Expand Down Expand Up @@ -993,6 +1010,7 @@ async def set_standard_blob_tier(self, standard_blob_tier, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def stage_block(
self, block_id, # type: str
data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
Expand Down Expand Up @@ -1039,6 +1057,7 @@ async def stage_block(
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def stage_block_from_url(
self, block_id, # type: str
source_url, # type: str
Expand Down Expand Up @@ -1083,6 +1102,7 @@ async def stage_block_from_url(
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def get_block_list(self, block_list_type="committed", **kwargs):
# type: (Optional[str], Any) -> Tuple[List[BlobBlock], List[BlobBlock]]
"""The Get Block List operation retrieves the list of blocks that have
Expand Down Expand Up @@ -1113,6 +1133,7 @@ async def get_block_list(self, block_list_type="committed", **kwargs):
process_storage_error(error)
return self._get_block_list_result(blocks)

@distributed_trace_async
async def commit_block_list( # type: ignore
self, block_list, # type: List[BlobBlock]
content_settings=None, # type: Optional[ContentSettings]
Expand Down Expand Up @@ -1179,6 +1200,7 @@ async def commit_block_list( # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def set_premium_page_blob_tier(self, premium_page_blob_tier, **kwargs):
# type: (Union[str, PremiumPageBlobTier], Optional[int], Optional[Union[LeaseClient, str]], **Any) -> None
"""Sets the page blob tiers on the blob. This API is only supported for page blobs on premium accounts.
Expand Down Expand Up @@ -1210,6 +1232,7 @@ async def set_premium_page_blob_tier(self, premium_page_blob_tier, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def get_page_ranges( # type: ignore
self, start_range=None, # type: Optional[int]
end_range=None, # type: Optional[int]
Expand Down Expand Up @@ -1284,6 +1307,7 @@ async def get_page_ranges( # type: ignore
process_storage_error(error)
return self._get_page_ranges_result(ranges)

@distributed_trace_async
async def set_sequence_number( # type: ignore
self, sequence_number_action, # type: Union[str, SequenceNumberAction]
sequence_number=None, # type: Optional[str]
Expand Down Expand Up @@ -1336,6 +1360,7 @@ async def set_sequence_number( # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def resize_blob(self, size, **kwargs):
# type: (int, Any) -> Dict[str, Union[str, datetime]]
"""Resizes a page blob to the specified size.
Expand Down Expand Up @@ -1381,6 +1406,7 @@ async def resize_blob(self, size, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def upload_page( # type: ignore
self, page, # type: bytes
start_range, # type: int
Expand Down Expand Up @@ -1466,6 +1492,7 @@ async def upload_page( # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def clear_page(self, start_range, end_range, **kwargs):
# type: (int, int) -> Dict[str, Union[str, datetime]]
"""Clears a range of pages.
Expand Down Expand Up @@ -1525,6 +1552,7 @@ async def clear_page(self, start_range, end_range, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def append_block( # type: ignore
self, data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
length=None, # type: Optional[int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
TYPE_CHECKING
)

from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.async_paging import AsyncItemPaged

from .._shared.models import LocationMode
from .._shared.policies_async import ExponentialRetry
from .._shared.base_client_async import AsyncStorageAccountHostsMixin
Expand Down Expand Up @@ -107,6 +110,7 @@ def __init__(
self._client = AzureBlobStorage(url=self.url, pipeline=self._pipeline, loop=loop)
self._loop = loop

@distributed_trace_async
async def get_account_information(self, **kwargs): # type: ignore
# type: (Optional[int]) -> Dict[str, str]
"""Gets information related to the storage account.
Expand All @@ -130,6 +134,7 @@ async def get_account_information(self, **kwargs): # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def get_service_stats(self, timeout=None, **kwargs): # type: ignore
# type: (Optional[int], **Any) -> Dict[str, Any]
"""Retrieves statistics related to replication for the Blob service.
Expand Down Expand Up @@ -169,6 +174,7 @@ async def get_service_stats(self, timeout=None, **kwargs): # type: ignore
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def get_service_properties(self, timeout=None, **kwargs):
# type(Optional[int]) -> Dict[str, Any]
"""Gets the properties of a storage account's Blob service, including
Expand All @@ -191,6 +197,7 @@ async def get_service_properties(self, timeout=None, **kwargs):
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace_async
async def set_service_properties(
self, logging=None, # type: Optional[Logging]
hour_metrics=None, # type: Optional[Metrics]
Expand Down Expand Up @@ -267,6 +274,7 @@ async def set_service_properties(
except StorageErrorException as error:
process_storage_error(error)

@distributed_trace
def list_containers(
self, name_starts_with=None, # type: Optional[str]
include_metadata=False, # type: Optional[bool]
Expand Down Expand Up @@ -316,6 +324,7 @@ def list_containers(
page_iterator_class=ContainerPropertiesPaged
)

@distributed_trace_async
async def create_container(
self, name, # type: str
metadata=None, # type: Optional[Dict[str, str]]
Expand Down Expand Up @@ -355,6 +364,7 @@ async def create_container(
metadata=metadata, public_access=public_access, timeout=timeout, **kwargs)
return container

@distributed_trace_async
async def delete_container(
self, container, # type: Union[ContainerProperties, str]
lease=None, # type: Optional[Union[LeaseClient, str]]
Expand Down
Loading

0 comments on commit 69f6de5

Please sign in to comment.