Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch operations #13713

Merged
merged 37 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
30f473a
basic starters for batch operations
seankane-msft Aug 10, 2020
6843a81
adding methods to make TableBatchOperations a context manager
seankane-msft Aug 12, 2020
4606f1c
more batch changes
seankane-msft Aug 14, 2020
e500a5e
adding more steps, stuck on the _base_client _batch_send method aroun…
seankane-msft Aug 17, 2020
3111597
removing some print statements
seankane-msft Aug 19, 2020
0735f8d
updating a couple things
seankane-msft Aug 19, 2020
2c92d72
fridays updates
seankane-msft Aug 24, 2020
b9d46c7
merge conflicts
seankane-msft Sep 9, 2020
549714a
single insert on a batch returns a 201, but the response is not deser…
seankane-msft Sep 9, 2020
5de6079
batching works for insert entity
seankane-msft Sep 9, 2020
3d56ce6
some tests are working, have an issue when there's a single item in t…
seankane-msft Sep 10, 2020
f4cb600
first iteration on batching, includes create, update, and delete
seankane-msft Sep 10, 2020
e1efffa
accidentally changed the version file
seankane-msft Sep 10, 2020
4a2b720
linting stuff and excluding batching for versions >3
seankane-msft Sep 10, 2020
d35ce7f
added upsert to batching
seankane-msft Sep 16, 2020
e081480
starting work on async code
seankane-msft Sep 16, 2020
3f9a4ae
batching support for async now
seankane-msft Sep 16, 2020
307b359
added test for different partition keys
seankane-msft Sep 21, 2020
f6737eb
changes to make the request not ask for a response from the service o…
seankane-msft Sep 22, 2020
5041247
changed commit_batch to send_batch
seankane-msft Sep 23, 2020
06c9714
Changed naming of PartialBatchErrorException to BatchErrorException
seankane-msft Sep 23, 2020
2b32581
started work on BatchTransactionResponse
seankane-msft Sep 23, 2020
11ed025
aligning more with .NET, working on deserializing the requests to bui…
seankane-msft Sep 25, 2020
f390517
updating merge conflicts on batching, removed old recordings
seankane-msft Sep 29, 2020
5f229f5
fixed up a test and fixed pylint issues
seankane-msft Sep 29, 2020
1de3064
added all operations into async batch, need to fix up testing, having…
seankane-msft Sep 29, 2020
ea9dca0
merge conflicts only from the testing branch added into the batching …
seankane-msft Oct 1, 2020
6046d0c
got async batching working now, need to finish up addressing anna's c…
seankane-msft Oct 2, 2020
45fc107
addresses Annas comments, need to add a bit more to the transaction r…
seankane-msft Oct 2, 2020
f7f66fb
adds a sample for batching, and documentation for the sphinx generation
seankane-msft Oct 2, 2020
4fb73e3
finished up all comments on the batching for sync/async, will need a …
seankane-msft Oct 2, 2020
850ed8e
updatest to batching, mostly for documentation
seankane-msft Oct 19, 2020
87c2008
removing an unused import
seankane-msft Oct 19, 2020
ce851ba
removed batch.send_batch() method, closed client sessions on async tests
seankane-msft Oct 22, 2020
066f5f1
addressing Anna's comments, added another test to verify behavior, ad…
seankane-msft Oct 22, 2020
ff5a833
test pipeline failed because cosmos requests were too close to each o…
seankane-msft Oct 22, 2020
c4d50d7
Merge branch 'master' into batch-operations
annatisch Oct 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdk/tables/azure-data-tables/azure/data/tables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
LocationMode,
ResourceTypes,
AccountSasPermissions,
BatchTransactionResult,
BatchErrorException
)
from ._policies import ExponentialRetry, LinearRetry
from ._version import VERSION
from ._deserialize import TableErrorCode
from ._table_batch import TableBatchOperations

__version__ = VERSION

Expand All @@ -53,5 +56,8 @@
'EdmType',
'RetentionPolicy',
'generate_table_sas',
'SASProtocol'
'SASProtocol',
'BatchTransactionResult',
'TableBatchOperations',
'BatchErrorException'
]
54 changes: 30 additions & 24 deletions sdk/tables/azure-data-tables/azure/data/tables/_base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
TYPE_CHECKING,
)
import logging
from uuid import uuid4



Expand All @@ -29,7 +30,11 @@
from azure.core.configuration import Configuration
from azure.core.exceptions import HttpResponseError
from azure.core.pipeline import Pipeline
from azure.core.pipeline.transport import RequestsTransport, HttpTransport
from azure.core.pipeline.transport import (
RequestsTransport,
HttpTransport,
HttpRequest,
)
from azure.core.pipeline.policies import (
RedirectPolicy,
ContentDecodePolicy,
Expand All @@ -42,7 +47,7 @@

from ._shared_access_signature import QueryStringConstants
from ._constants import STORAGE_OAUTH_SCOPE, SERVICE_HOST_BASE, CONNECTION_TIMEOUT, READ_TIMEOUT
from ._models import LocationMode
from ._models import LocationMode, BatchTransactionResult
from ._authentication import SharedKeyCredentialPolicy
from ._policies import (
StorageHeadersPolicy,
Expand All @@ -54,7 +59,7 @@
TablesRetryPolicy,
)
from ._error import _process_table_error
from ._models import PartialBatchErrorException
from ._models import BatchErrorException
from ._sdk_moniker import SDK_MONIKER


Expand Down Expand Up @@ -251,54 +256,55 @@ def _create_pipeline(self, credential, **kwargs):
return config, Pipeline(config.transport, policies=policies)

def _batch_send(
self, *reqs, # type: HttpRequest
self, entities, # type: List[TableEntity]
*reqs, # type: List[HttpRequest]
**kwargs
):
# (...) -> List[HttpResponse]
"""Given a series of request, do a Storage batch call.
"""
# Pop it here, so requests doesn't feel bad about additional kwarg
raise_on_any_failure = kwargs.pop("raise_on_any_failure", True)
policies = [StorageHeadersPolicy()]

changeset = HttpRequest('POST', None)
changeset.set_multipart_mixed(
*reqs,
policies=policies,
boundary="changeset_{}".format(uuid4())
)
request = self._client._client.post( # pylint: disable=protected-access
url='{}://{}/?comp=batch{}{}'.format(
self.scheme,
self._primary_hostname,
kwargs.pop('sas', None),
kwargs.pop('timeout', None)
),
url='https://{}/$batch'.format(self._primary_hostname),
headers={
'x-ms-version': self.api_version
'x-ms-version': self.api_version,
'DataServiceVersion': '3.0',
'MaxDataServiceVersion': '3.0;NetFx',
}
)

policies = [StorageHeadersPolicy()]
if self._credential_policy:
policies.append(self._credential_policy)

request.set_multipart_mixed(
*reqs,
changeset,
policies=policies,
enforce_https=False
enforce_https=False,
boundary="batch_{}".format(uuid4())
)

pipeline_response = self._pipeline.run(
request, **kwargs
)
response = pipeline_response.http_response

try:
if response.status_code not in [202]:
raise HttpResponseError(response=response)
parts = response.parts()
transaction_result = BatchTransactionResult(reqs, parts, entities)
if raise_on_any_failure:
parts = list(response.parts())
if any(p for p in parts if not 200 <= p.status_code < 300):
error = PartialBatchErrorException(
message="There is a partial failure in the batch operation.",
error = BatchErrorException(
message="There is a failure in the batch operation.",
response=response, parts=parts
)
raise error
return iter(parts)
return parts
return transaction_result
except HttpResponseError as error:
_process_table_error(error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ def _convert_to_entity(entry_element):
# Timestamp is a known property
timestamp = properties.pop('Timestamp', None)
if timestamp:
# TODO: verify change here
# entity['Timestamp'] = _from_entity_datetime(timestamp)
entity['Timestamp'] = timestamp

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def query(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
_top = None
_select = None
Expand Down Expand Up @@ -165,7 +165,7 @@ def create(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -331,7 +331,7 @@ def query_entities(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
_top = None
_select = None
Expand Down Expand Up @@ -437,7 +437,7 @@ def query_entities_with_partition_and_row_key(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
_select = None
_filter = None
Expand Down Expand Up @@ -547,7 +547,7 @@ def update_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -655,7 +655,7 @@ def merge_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -759,7 +759,7 @@ def delete_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down Expand Up @@ -849,7 +849,7 @@ def insert_entity(
401: ClientAuthenticationError, 404: ResourceNotFoundError, 409: ResourceExistsError
}
error_map.update(kwargs.pop('error_map', {}))

_format = None
if query_options is not None:
_format = query_options.format
Expand Down
44 changes: 44 additions & 0 deletions sdk/tables/azure-data-tables/azure/data/tables/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,50 @@ def __init__(self, message, response, parts):
super(PartialBatchErrorException, self).__init__(message=message, response=response)


class BatchErrorException(HttpResponseError):
"""There is a failure in batch operations.

:param str message: The message of the exception.
:param response: Server response to be deserialized.
:param list parts: A list of the parts in multipart response.
"""

def __init__(self, message, response, parts):
self.parts = parts
super(BatchErrorException, self).__init__(message=message, response=response)


class BatchTransactionResult(object):
"""The result of a successful batch operation, can be used by a user to
recreate a request in the case of BatchErrorException

:param List[HttpRequest] requests: The requests of the batch
:param List[HttpResponse] results: The HTTP response of each request
"""

def __init__(self, requests, results, entities):
self.requests = requests
self.results = results
self.entities = entities

def get_entity(self, row_key):
for entity in self.entities:
if entity['RowKey'] == row_key:
return entity
return None

def get_request(self, row_key):
for i, entity in enumerate(self.entities):
if entity['RowKey'] == row_key:
return self.requests[i]
return None

def get_result(self, row_key):
for i, entity in enumerate(self.entities):
if entity['RowKey'] == row_key:
return self.results[i]
return None


class LocationMode(object):
"""
Expand Down
6 changes: 1 addition & 5 deletions sdk/tables/azure-data-tables/azure/data/tables/_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ def _to_entity_guid(value):


def _to_entity_int32(value):
# TODO: What the heck? below
if sys.version_info < (3,):
value = int(value)
else:
value = int(value)
value = int(value)
if value >= 2 ** 31 or value < -(2 ** 31):
raise TypeError(_ERROR_VALUE_TOO_LARGE.format(str(value), EdmType.INT32))
return None, value
Expand Down
Loading