Skip to content

Commit

Permalink
Storage batch API (#7387)
Browse files Browse the repository at this point in the history
* delete_blobs POC

* Storage POC WIP

* Storage POC async

* Storage WIP

* Storage clean-up

* Storage options

* Autorest remark

* Use config option

* SetBlobTier WIP

* Working and tested delete_blobs + options

* Set standard tier with tests

* Adapt tests for playback mode

* Move batch_send to common mixin

* Python 2.7 compat

* Fix snapshot bug

* Re-record

* Pass kwargs to batch

* Clean x-ms-client-request-id from body if multipart response

* remove duplicated comp=tier

* Disable batch tests on Python 2.7

* pylint
  • Loading branch information
lmazuel authored Oct 3, 2019
1 parent f6d32d0 commit a80734c
Show file tree
Hide file tree
Showing 18 changed files with 5,168 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import six

from azure.core import Configuration
from azure.core.exceptions import HttpResponseError
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
Expand All @@ -46,6 +47,8 @@
QueueMessagePolicy,
ExponentialRetry,
)
from .._generated.models import StorageErrorException
from .response_handlers import process_storage_error


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -147,11 +150,11 @@ def _format_query_string(self, sas_token, credential, snapshot=None, share_snaps

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
self._credential_policy = None
if hasattr(credential, "get_token"):
credential_policy = BearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
self._credential_policy = BearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
self._credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))

Expand All @@ -169,7 +172,7 @@ def _create_pipeline(self, credential, **kwargs):
config.user_agent_policy,
StorageContentValidation(),
StorageRequestHook(**kwargs),
credential_policy,
self._credential_policy,
ContentDecodePolicy(),
RedirectPolicy(**kwargs),
StorageHosts(hosts=self._hosts, **kwargs),
Expand All @@ -180,6 +183,39 @@ def _create_pipeline(self, credential, **kwargs):
]
return config, Pipeline(config.transport, policies=policies)

def _batch_send(
self, *reqs, # type: HttpRequest
**kwargs
):
"""Given a series of request, do a Storage batch call.
"""
request = self._client._client.post( # pylint: disable=protected-access
url='https://{}/?comp=batch'.format(self.primary_hostname),
headers={
'x-ms-version': self._client._config.version # pylint: disable=protected-access
}
)

request.set_multipart_mixed(
*reqs,
policies=[
StorageHeadersPolicy(),
self._credential_policy
]
)

pipeline_response = self._pipeline.run(
request, **kwargs
)
response = pipeline_response.http_response

try:
if response.status_code not in [202]:
raise HttpResponseError(response=response)
return response.parts()
except StorageErrorException as error:
process_storage_error(error)


def format_shared_key_credential(account, credential):
if isinstance(credential, six.string_types):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging

from azure.core.pipeline import AsyncPipeline

from azure.core.exceptions import HttpResponseError
from azure.core.pipeline.policies.distributed_tracing import DistributedTracingPolicy
from azure.core.pipeline.policies import (
ContentDecodePolicy,
Expand All @@ -25,11 +25,16 @@
StorageContentValidation,
StorageRequestHook,
StorageHosts,
StorageHeadersPolicy,
QueueMessagePolicy)
from .policies_async import AsyncStorageResponseHook

from .._generated.models import StorageErrorException
from .response_handlers import process_storage_error

if TYPE_CHECKING:
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import HttpRequest
from azure.core import Configuration
_LOGGER = logging.getLogger(__name__)

Expand All @@ -51,11 +56,11 @@ async def __aexit__(self, *args):

def _create_pipeline(self, credential, **kwargs):
# type: (Any, **Any) -> Tuple[Configuration, Pipeline]
credential_policy = None
self._credential_policy = None
if hasattr(credential, 'get_token'):
credential_policy = AsyncBearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
self._credential_policy = AsyncBearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE)
elif isinstance(credential, SharedKeyCredentialPolicy):
credential_policy = credential
self._credential_policy = credential
elif credential is not None:
raise TypeError("Unsupported credential: {}".format(credential))
config = kwargs.get('_configuration') or create_configuration(**kwargs)
Expand All @@ -76,7 +81,7 @@ def _create_pipeline(self, credential, **kwargs):
config.user_agent_policy,
StorageContentValidation(),
StorageRequestHook(**kwargs),
credential_policy,
self._credential_policy,
ContentDecodePolicy(),
AsyncRedirectPolicy(**kwargs),
StorageHosts(hosts=self._hosts, **kwargs), # type: ignore
Expand All @@ -86,3 +91,36 @@ def _create_pipeline(self, credential, **kwargs):
DistributedTracingPolicy(),
]
return config, AsyncPipeline(config.transport, policies=policies)

async def _batch_send(
self, *reqs: 'HttpRequest',
**kwargs
):
"""Given a series of request, do a Storage batch call.
"""
request = self._client._client.post( # pylint: disable=protected-access
url='https://{}/?comp=batch'.format(self.primary_hostname),
headers={
'x-ms-version': self._client._config.version # pylint: disable=protected-access
}
)

request.set_multipart_mixed(
*reqs,
policies=[
StorageHeadersPolicy(),
self._credential_policy
]
)

pipeline_response = await self._pipeline.run(
request, **kwargs
)
response = pipeline_response.http_response

try:
if response.status_code not in [202]:
raise HttpResponseError(response=response)
return response.parts() # Return an AsyncIterator
except StorageErrorException as error:
process_storage_error(error)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=too-many-lines
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
Expand All @@ -6,13 +7,14 @@

import functools
from typing import ( # pylint: disable=unused-import
Union, Optional, Any, Iterable, AnyStr, Dict, List, Tuple, IO,
Union, Optional, Any, Iterable, AnyStr, Dict, List, Tuple, IO, AsyncIterator,
TYPE_CHECKING
)

from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.decorator_async import distributed_trace_async
from azure.core.async_paging import AsyncItemPaged
from azure.core.pipeline.transport import HttpRequest, AsyncHttpResponse

from .._shared.base_client_async import AsyncStorageAccountHostsMixin
from .._shared.policies_async import ExponentialRetry
Expand Down Expand Up @@ -42,6 +44,7 @@
from ..models import ( # pylint: disable=unused-import
AccessPolicy,
ContentSettings,
StandardBlobTier,
PremiumPageBlobTier)


Expand Down Expand Up @@ -775,6 +778,200 @@ async def delete_blob(
timeout=timeout,
**kwargs)

@distributed_trace_async
async def delete_blobs( # pylint: disable=arguments-differ
self, *blobs, # type: Union[str, BlobProperties]
delete_snapshots=None, # type: Optional[str]
lease=None, # type: Optional[Union[str, LeaseClient]]
timeout=None, # type: Optional[int]
**kwargs
):
# type: (...) -> None
"""Marks the specified blobs or snapshots for deletion.
The blob is later deleted during garbage collection.
Note that in order to delete a blob, you must delete all of its
snapshots. You can delete both at the same time with the Delete
Blob operation.
If a delete retention policy is enabled for the service, then this operation soft deletes the blob or snapshot
and retains the blob or snapshot for specified number of days.
After specified number of days, blob's data is removed from the service during garbage collection.
Soft deleted blob or snapshot is accessible through List Blobs API specifying `include="deleted"` option.
Soft-deleted blob or snapshot can be restored using Undelete API.
:param blob: The blob with which to interact. If specified, this value will override
a blob value specified in the blob URL.
:type blob: str or ~azure.storage.blob.models.BlobProperties
:param str delete_snapshots:
Required if the blob has associated snapshots. Values include:
- "only": Deletes only the blobs snapshots.
- "include": Deletes the blob along with all snapshots.
:param lease:
Required if the blob has an active lease. Value can be a Lease object
or the lease ID as a string.
:type lease: ~azure.storage.blob.lease.LeaseClient or str
:param str delete_snapshots:
Required if the blob has associated snapshots. Values include:
- "only": Deletes only the blobs snapshots.
- "include": Deletes the blob along with all snapshots.
:param datetime if_modified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only
if the resource has been modified since the specified time.
:param datetime if_unmodified_since:
A DateTime value. Azure expects the date value passed in to be UTC.
If timezone is included, any non-UTC datetimes will be converted to UTC.
If a date is passed in without timezone info, it is assumed to be UTC.
Specify this header to perform the operation only if
the resource has not been modified since the specified date/time.
:param str if_match:
An ETag value, or the wildcard character (*). Specify this header to perform
the operation only if the resource's ETag matches the value specified.
:param str if_none_match:
An ETag value, or the wildcard character (*). Specify this header
to perform the operation only if the resource's ETag does not match
the value specified. Specify the wildcard character (*) to perform
the operation only if the resource does not exist, and fail the
operation if it does exist.
:param int timeout:
The timeout parameter is expressed in seconds.
:rtype: None
"""
options = BlobClient._generic_delete_blob_options( # pylint: disable=protected-access
delete_snapshots=delete_snapshots,
lease=lease,
timeout=timeout,
**kwargs
)
query_parameters, header_parameters = self._generate_delete_blobs_options(**options)
# To pass kwargs to "_batch_send", we need to remove anything that was
# in the Autorest signature for Autorest, otherwise transport will be upset
for possible_param in ['timeout', 'delete_snapshots', 'lease_access_conditions', 'modified_access_conditions']:
options.pop(possible_param, None)

reqs = []
for blob in blobs:
req = HttpRequest(
"DELETE",
"/{}/{}".format(self.container_name, blob),
headers=header_parameters
)
req.format_parameters(query_parameters)
reqs.append(req)

return await self._batch_send(*reqs, **options)

@distributed_trace
async def set_standard_blob_tier_blobs(
self,
standard_blob_tier: Union[str, 'StandardBlobTier'],
*blobs: Union[str, BlobProperties],
**kwargs
) -> AsyncIterator[AsyncHttpResponse]:
"""This operation sets the tier on block blobs.
A block blob's tier determines Hot/Cool/Archive storage type.
This operation does not update the blob's ETag.
:param blobs: The blobs with which to interact.
:type blobs: str or ~azure.storage.blob.models.BlobProperties
:param standard_blob_tier:
Indicates the tier to be set on the blob. Options include 'Hot', 'Cool',
'Archive'. The hot tier is optimized for storing data that is accessed
frequently. The cool storage tier is optimized for storing data that
is infrequently accessed and stored for at least a month. The archive
tier is optimized for storing data that is rarely accessed and stored
for at least six months with flexible latency requirements.
:type standard_blob_tier: str or ~azure.storage.blob.models.StandardBlobTier
:param int timeout:
The timeout parameter is expressed in seconds.
:param lease:
Required if the blob has an active lease. Value can be a LeaseClient object
or the lease ID as a string.
:type lease: ~azure.storage.blob.lease.LeaseClient or str
:rtype: None
"""
access_conditions = get_access_conditions(kwargs.pop('lease', None))
if standard_blob_tier is None:
raise ValueError("A StandardBlobTier must be specified")

query_parameters, header_parameters = self._generate_set_tier_options(
tier=standard_blob_tier,
lease_access_conditions=access_conditions,
**kwargs
)
# To pass kwargs to "_batch_send", we need to remove anything that was
# in the Autorest signature for Autorest, otherwise transport will be upset
for possible_param in ['timeout', 'lease']:
kwargs.pop(possible_param, None)

reqs = []
for blob in blobs:
req = HttpRequest(
"PUT",
"/{}/{}".format(self.container_name, blob),
headers=header_parameters
)
req.format_parameters(query_parameters)
reqs.append(req)

return await self._batch_send(*reqs, **kwargs)

@distributed_trace
async def set_premium_page_blob_tier_blobs(
self,
premium_page_blob_tier: Union[str, 'PremiumPageBlobTier'],
*blobs: Union[str, BlobProperties],
**kwargs
) -> AsyncIterator[AsyncHttpResponse]:
"""Sets the page blob tiers on the blobs. This API is only supported for page blobs on premium accounts.
:param blobs: The blobs with which to interact.
:type blobs: str or ~azure.storage.blob.models.BlobProperties
:param premium_page_blob_tier:
A page blob tier value to set the blob to. The tier correlates to the size of the
blob and number of allowed IOPS. This is only applicable to page blobs on
premium storage accounts.
:type premium_page_blob_tier: ~azure.storage.blob.models.PremiumPageBlobTier
:param int timeout:
The timeout parameter is expressed in seconds. This method may make
multiple calls to the Azure service and the timeout will apply to
each call individually.
:param lease:
Required if the blob has an active lease. Value can be a LeaseClient object
or the lease ID as a string.
:type lease: ~azure.storage.blob.lease.LeaseClient or str
:rtype: None
"""
access_conditions = get_access_conditions(kwargs.pop('lease', None))
if premium_page_blob_tier is None:
raise ValueError("A PremiumPageBlobTier must be specified")

query_parameters, header_parameters = self._generate_set_tier_options(
tier=premium_page_blob_tier,
lease_access_conditions=access_conditions,
**kwargs
)
# To pass kwargs to "_batch_send", we need to remove anything that was
# in the Autorest signature for Autorest, otherwise transport will be upset
for possible_param in ['timeout', 'lease']:
kwargs.pop(possible_param, None)

reqs = []
for blob in blobs:
req = HttpRequest(
"PUT",
"/{}/{}".format(self.container_name, blob),
headers=header_parameters
)
req.format_parameters(query_parameters)
reqs.append(req)

return await self._batch_send(*reqs, **kwargs)

def get_blob_client(
self, blob, # type: Union[str, BlobProperties]
snapshot=None # type: str
Expand Down
Loading

0 comments on commit a80734c

Please sign in to comment.