Skip to content

Commit

Permalink
Batch operations (#13713)
Browse files Browse the repository at this point in the history
* basic starters for batch operations

* adding methods to make TableBatchOperations a context manager

* more batch changes

* adding more steps, stuck on the _base_client _batch_send method around the print statements

* removing some print statements

* updating a couple things

* fridays updates

* single insert on a batch returns a 201, but the response is not deserialized

* batching works for insert entity

* some tests are working, have an issue when there's a single item in the batch and it's not create

* first iteration on batching, includes create, update, and delete

* linting stuff and excluding batching for versions >3

* added upsert to batching

* starting work on async code

* batching support for async now

* added test for different partition keys

* changes to make the request not ask for a response from the service on inserts, no need to return the inserted entity

* changed commit_batch to send_batch

* Changed naming of PartialBatchErrorException to BatchErrorException

* started work on BatchTransactionResponse

* aligning more with .NET, working on deserializing the requests to build the entities again

* fixed up a test and fixed pylint issues

* added all operations into async batch, need to fix up testing, having a preparer error

* got async batching working now, need to finish up addressing anna's comments and finish some small implementation details of the batch result object

* addresses Annas comments, need to add a bit more to the transaction result and samples

* adds a sample for batching, and documentation for the sphinx generation

* finished up all comments on the batching for sync/async, will need a final review but is good to go on my end for release

* updatest to batching, mostly for documentation

* removing an unused import

* removed batch.send_batch() method, closed client sessions on async tests

* addressing Anna's comments, added another test to verify behavior, added length to the verify transaction result test method

* test pipeline failed because cosmos requests were too close to each other, bumping up the delay and re trying

Co-authored-by: annatisch <[email protected]>
  • Loading branch information
seankane-msft and annatisch authored Oct 26, 2020
1 parent 2302d4a commit e58b5ab
Show file tree
Hide file tree
Showing 65 changed files with 13,859 additions and 496 deletions.
8 changes: 7 additions & 1 deletion sdk/tables/azure-data-tables/azure/data/tables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
LocationMode,
ResourceTypes,
AccountSasPermissions,
BatchTransactionResult,
BatchErrorException
)
from ._policies import ExponentialRetry, LinearRetry
from ._version import VERSION
from ._deserialize import TableErrorCode
from ._table_batch import TableBatchOperations

__version__ = VERSION

Expand All @@ -53,5 +56,8 @@
'EdmType',
'RetentionPolicy',
'generate_table_sas',
'SASProtocol'
'SASProtocol',
'BatchTransactionResult',
'TableBatchOperations',
'BatchErrorException'
]
54 changes: 30 additions & 24 deletions sdk/tables/azure-data-tables/azure/data/tables/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TYPE_CHECKING,
)
import logging
from uuid import uuid4



Expand All @@ -29,7 +30,11 @@
from azure.core.configuration import Configuration
from azure.core.exceptions import HttpResponseError
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport, HttpTransport
from azure.core.pipeline.transport import (
RequestsTransport,
HttpTransport,
HttpRequest,
)
from azure.core.pipeline.policies import (
RedirectPolicy,
ContentDecodePolicy,
Expand All @@ -42,7 +47,7 @@

from ._shared_access_signature import QueryStringConstants
from ._constants import STORAGE_OAUTH_SCOPE, SERVICE_HOST_BASE, CONNECTION_TIMEOUT, READ_TIMEOUT
from ._models import LocationMode
from ._models import LocationMode, BatchTransactionResult
from ._authentication import SharedKeyCredentialPolicy
from ._policies import (
StorageHeadersPolicy,
Expand All @@ -54,7 +59,7 @@
TablesRetryPolicy,
)
from ._error import _process_table_error
from ._models import PartialBatchErrorException
from ._models import BatchErrorException
from ._sdk_moniker import SDK_MONIKER


Expand Down Expand Up @@ -251,54 +256,55 @@ def _create_pipeline(self, credential, **kwargs):
return config, Pipeline(config.transport, policies=policies)

def _batch_send(
self, *reqs, # type: HttpRequest
self, entities, # type: List[TableEntity]
*reqs, # type: List[HttpRequest]
**kwargs
):
# (...) -> List[HttpResponse]
"""Given a series of request, do a Storage batch call.
"""
# Pop it here, so requests doesn't feel bad about additional kwarg
raise_on_any_failure = kwargs.pop("raise_on_any_failure", True)
policies = [StorageHeadersPolicy()]

changeset = HttpRequest('POST', None)
changeset.set_multipart_mixed(
*reqs,
policies=policies,
boundary="changeset_{}".format(uuid4())
)
request = self._client._client.post( # pylint: disable=protected-access
url='{}://{}/?comp=batch{}{}'.format(
self.scheme,
self._primary_hostname,
kwargs.pop('sas', None),
kwargs.pop('timeout', None)
),
url='https://{}/$batch'.format(self._primary_hostname),
headers={
'x-ms-version': self.api_version
'x-ms-version': self.api_version,
'DataServiceVersion': '3.0',
'MaxDataServiceVersion': '3.0;NetFx',
}
)

policies = [StorageHeadersPolicy()]
if self._credential_policy:
policies.append(self._credential_policy)

request.set_multipart_mixed(
*reqs,
changeset,
policies=policies,
enforce_https=False
enforce_https=False,
boundary="batch_{}".format(uuid4())
)

pipeline_response = self._pipeline.run(
request, **kwargs
)
response = pipeline_response.http_response

try:
if response.status_code not in [202]:
raise HttpResponseError(response=response)
parts = response.parts()
transaction_result = BatchTransactionResult(reqs, parts, entities)
if raise_on_any_failure:
parts = list(response.parts())
if any(p for p in parts if not 200 <= p.status_code < 300):
error = PartialBatchErrorException(
message="There is a partial failure in the batch operation.",
error = BatchErrorException(
message="There is a failure in the batch operation.",
response=response, parts=parts
)
raise error
return iter(parts)
return parts
return transaction_result
except HttpResponseError as error:
_process_table_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def _convert_to_entity(entry_element):
# Timestamp is a known property
timestamp = properties.pop('Timestamp', None)
if timestamp:
# TODO: verify change here
# entity['Timestamp'] = _from_entity_datetime(timestamp)
entity['Timestamp'] = timestamp

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def query(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
_top = None
_select = None
Expand Down Expand Up @@ -165,7 +165,7 @@ def create(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -331,7 +331,7 @@ def query_entities(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
_top = None
_select = None
Expand Down Expand Up @@ -437,7 +437,7 @@ def query_entities_with_partition_and_row_key(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
_select = None
_filter = None
Expand Down Expand Up @@ -547,7 +547,7 @@ def update_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -655,7 +655,7 @@ def merge_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -759,7 +759,7 @@ def delete_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -849,7 +849,7 @@ def insert_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down
44 changes: 44 additions & 0 deletions sdk/tables/azure-data-tables/azure/data/tables/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,50 @@ def __init__(self, message, response, parts):
super(PartialBatchErrorException, self).__init__(message=message, response=response)


class BatchErrorException(HttpResponseError):
"""There is a failure in batch operations.
:param str message: The message of the exception.
:param response: Server response to be deserialized.
:param list parts: A list of the parts in multipart response.
"""

def __init__(self, message, response, parts):
self.parts = parts
super(BatchErrorException, self).__init__(message=message, response=response)


class BatchTransactionResult(object):
"""The result of a successful batch operation, can be used by a user to
recreate a request in the case of BatchErrorException
:param List[HttpRequest] requests: The requests of the batch
:param List[HttpResponse] results: The HTTP response of each request
"""

def __init__(self, requests, results, entities):
self.requests = requests
self.results = results
self.entities = entities

def get_entity(self, row_key):
for entity in self.entities:
if entity['RowKey'] == row_key:
return entity
return None

def get_request(self, row_key):
for i, entity in enumerate(self.entities):
if entity['RowKey'] == row_key:
return self.requests[i]
return None

def get_result(self, row_key):
for i, entity in enumerate(self.entities):
if entity['RowKey'] == row_key:
return self.results[i]
return None


class LocationMode(object):
"""
Expand Down
6 changes: 1 addition & 5 deletions sdk/tables/azure-data-tables/azure/data/tables/_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ def _to_entity_guid(value):


def _to_entity_int32(value):
# TODO: What the heck? below
if sys.version_info < (3,):
value = int(value)
else:
value = int(value)
value = int(value)
if value >= 2 ** 31 or value < -(2 ** 31):
raise TypeError(_ERROR_VALUE_TOO_LARGE.format(str(value), EdmType.INT32))
return None, value
Expand Down
Loading

0 comments on commit e58b5ab

Please sign in to comment.