diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index ba95dc07a2db..10a8015983f9 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -2,10 +2,14 @@ ### 4.3.0b3 (Unreleased) +#### Features Added +- Added support for split-proof queries for the async client + ### Bugs fixed - Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the consistency level of the user's cosmos account setting on initialization if not passed during client initialization. - This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`. Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details. + This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`. + Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details. ### 4.3.0b2 (2022-01-25) @@ -19,7 +23,8 @@ We will also be removing support for Python 3.6 and will only support Python 3.7 - Added async user agent for async client ### 4.3.0b1 (2021-12-14) -**New features** + +#### Features Added - Added language native async i/o client ### 4.2.0 (2020-10-08) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/_queue_async_helper.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/_queue_async_helper.py new file mode 100644 index 000000000000..ae81deb5a7cb --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/_queue_async_helper.py @@ -0,0 +1,74 @@ +# The MIT License (MIT) +# Copyright (c) 2022 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +async def heap_push(heap, item, document_producer_comparator): + """Push item onto heap, maintaining the heap invariant.""" + heap.append(item) + await _sift_down(heap, document_producer_comparator, 0, len(heap) - 1) + + +async def heap_pop(heap, document_producer_comparator): + """Pop the smallest item off the heap, maintaining the heap invariant.""" + last_elt = heap.pop() # raises appropriate IndexError if heap is empty + if heap: + return_item = heap[0] + heap[0] = last_elt + await _sift_up(heap, document_producer_comparator, 0) + return return_item + return last_elt + + +async def _sift_down(heap, document_producer_comparator, start_pos, pos): + new_item = heap[pos] + # Follow the path to the root, moving parents down until finding a place + # new_item fits. + while pos > start_pos: + parent_pos = (pos - 1) >> 1 + parent = heap[parent_pos] + if await document_producer_comparator.compare(new_item, parent) < 0: + # if new_item < parent: + heap[pos] = parent + pos = parent_pos + continue + break + heap[pos] = new_item + + +async def _sift_up(heap, document_producer_comparator, pos): + end_pos = len(heap) + start_pos = pos + new_item = heap[pos] + # Bubble up the smaller child until hitting a leaf. + child_pos = 2 * pos + 1 # leftmost child position + while child_pos < end_pos: + # Set child_pos to index of smaller child. + right_pos = child_pos + 1 + # if right_pos < end_pos and not heap[child_pos] < heap[right_pos]: + if right_pos < end_pos and not await document_producer_comparator.compare(heap[child_pos], heap[right_pos]) < 0: + child_pos = right_pos + # Move the smaller child up. + heap[pos] = heap[child_pos] + pos = child_pos + child_pos = 2 * pos + 1 + # The leaf at pos is empty now. Put new_item there, and bubble it up + # to its final resting place (by sifting its parents down). + heap[pos] = new_item + await _sift_down(heap, document_producer_comparator, start_pos, pos) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py index 07a4422814a2..075a7839350f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py @@ -121,6 +121,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function): self._has_started = True new_options = copy.deepcopy(self._options) new_options["continuation"] = self._continuation + (fetched_items, response_headers) = await fetch_function(new_options) continuation_key = http_constants.HttpHeaders.Continuation # Use Etag as continuation token for change feed queries. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py index 695afddc7a26..c95ee7ea9f1a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/document_producer.py @@ -62,9 +62,6 @@ async def fetch_fn(options): self._ex_context = _DefaultQueryExecutionContext(client, self._options, fetch_fn) - def __lt__(self, other): - return self._doc_producer_comp.compare(self, other) < 0 - async def __aiter__(self): return self @@ -104,14 +101,13 @@ async def peek(self): return self._cur_item - def _compare_helper(a, b): if a is None and b is None: return 0 return (a > b) - (a < b) -class _PartitionKeyRangeDocumentProduerComparator(object): +class _PartitionKeyRangeDocumentProducerComparator(object): """ Provides a Comparator for document producers using the min value of the corresponding target partition. @@ -120,7 +116,7 @@ class _PartitionKeyRangeDocumentProduerComparator(object): def __init__(self): pass - def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use + async def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use return _compare_helper( doc_producer1.get_target_range()["minInclusive"], doc_producer2.get_target_range()["minInclusive"] ) @@ -179,7 +175,7 @@ def getTypeStr(orderby_item): raise TypeError("unknown type" + str(val)) @staticmethod - def compare(orderby_item1, orderby_item2): + async def compare(orderby_item1, orderby_item2): """Compare two orderby item pairs. :param dict orderby_item1: @@ -213,7 +209,7 @@ def _peek_order_by_items(peek_result): return peek_result["orderByItems"] -class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProduerComparator): +class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProducerComparator): """Provide a Comparator for document producers which respects orderby sort order. """ @@ -229,7 +225,7 @@ def __init__(self, sort_order): # pylint: disable=super-init-not-called """ self._sort_order = sort_order - def compare(self, doc_producer1, doc_producer2): + async def compare(self, doc_producer1, doc_producer2): """Compares the given two instances of DocumentProducers. Based on the orderby query items and whether the sort order is Ascending @@ -238,29 +234,29 @@ def compare(self, doc_producer1, doc_producer2): If the peek results are equal based on the sort order, this comparator compares the target partition key range of the two DocumentProducers. - :param _DocumentProducer doc_producers1: first instance - :param _DocumentProducer doc_producers2: first instance + :param _DocumentProducer doc_producer1: first instance + :param _DocumentProducer doc_producer2: first instance :return: Integer value of compare result. - positive integer if doc_producers1 > doc_producers2 - negative integer if doc_producers1 < doc_producers2 + positive integer if doc_producer1 > doc_producer2 + negative integer if doc_producer1 < doc_producer2 :rtype: int """ - res1 = _peek_order_by_items(doc_producer1.peek()) - res2 = _peek_order_by_items(doc_producer2.peek()) + res1 = _peek_order_by_items(await doc_producer1.peek()) + res2 = _peek_order_by_items(await doc_producer2.peek()) self._validate_orderby_items(res1, res2) for i, (elt1, elt2) in enumerate(zip(res1, res2)): - res = _OrderByHelper.compare(elt1, elt2) + res = await _OrderByHelper.compare(elt1, elt2) if res != 0: if self._sort_order[i] == "Ascending": return res if self._sort_order[i] == "Descending": return -res - return _PartitionKeyRangeDocumentProduerComparator.compare(self, doc_producer1, doc_producer2) + return await _PartitionKeyRangeDocumentProducerComparator.compare(self, doc_producer1, doc_producer2) def _validate_orderby_items(self, res1, res2): if len(res1) != len(res2): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py index 0a1bd0fa97b9..a51f3452835b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py @@ -70,7 +70,7 @@ async def __anext__(self): query_to_use = self._query if self._query is not None else "Select * from root r" query_execution_info = _PartitionedQueryExecutionInfo(await self._client._GetQueryPlanThroughGateway (query_to_use, self._resource_link)) - self._execution_context = self._create_pipelined_execution_context(query_execution_info) + self._execution_context = await self._create_pipelined_execution_context(query_execution_info) else: raise e diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py index bf3547b8abd3..6cb08a542ce0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/multi_execution_aggregator.py @@ -22,10 +22,10 @@ """Internal class for multi execution context aggregator implementation in the Azure Cosmos database service. """ -import heapq from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase -from azure.cosmos._execution_context.aio import document_producer +from azure.cosmos._execution_context.aio import document_producer, _queue_async_helper from azure.cosmos._routing import routing_range +from azure.cosmos import exceptions # pylint: disable=protected-access @@ -50,11 +50,11 @@ class PriorityQueue: def __init__(self): self._heap = [] - def pop(self): - return heapq.heappop(self._heap) + async def pop_async(self, document_producer_comparator): + return await _queue_async_helper.heap_pop(self._heap, document_producer_comparator) - def push(self, item): - heapq.heappush(self._heap, item) + async def push_async(self, item, document_producer_comparator): + await _queue_async_helper.heap_push(self._heap, item, document_producer_comparator) def peek(self): return self._heap[0] @@ -76,7 +76,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i if self._sort_orders: self._document_producer_comparator = document_producer._OrderByDocumentProducerComparator(self._sort_orders) else: - self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator() + self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProducerComparator() self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue() @@ -89,13 +89,13 @@ async def __anext__(self): """ if self._orderByPQ.size() > 0: - targetRangeExContext = self._orderByPQ.pop() + targetRangeExContext = await self._orderByPQ.pop_async(self._document_producer_comparator) res = await targetRangeExContext.__anext__() try: # TODO: we can also use more_itertools.peekable to be more python friendly await targetRangeExContext.peek() - self._orderByPQ.push(targetRangeExContext) + await self._orderByPQ.push_async(targetRangeExContext, self._document_producer_comparator) except StopAsyncIteration: pass @@ -107,6 +107,33 @@ async def fetch_next_block(self): raise NotImplementedError("You should use pipeline's fetch_next_block.") + async def _repair_document_producer(self): + """Repairs the document producer context by using the re-initialized routing map provider in the client, + which loads in a refreshed partition key range cache to re-create the partition key ranges. + After loading this new cache, the document producers get re-created with the new valid ranges. + """ + # refresh the routing provider to get the newly initialized one post-refresh + self._routing_provider = self._client._routing_map_provider + # will be a list of (partition_min, partition_max) tuples + targetPartitionRanges = await self._get_target_partition_key_range() + + targetPartitionQueryExecutionContextList = [] + for partitionTargetRange in targetPartitionRanges: + # create and add the child execution context for the target range + targetPartitionQueryExecutionContextList.append( + self._createTargetPartitionQueryExecutionContext(partitionTargetRange) + ) + + for targetQueryExContext in targetPartitionQueryExecutionContextList: + try: + # TODO: we can also use more_itertools.peekable to be more python friendly + await targetQueryExContext.peek() + # if there are matching results in the target ex range add it to the priority queue + await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator) + + except StopAsyncIteration: + continue + def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range): rewritten_query = self._partitioned_query_ex_info.get_rewritten_query() @@ -148,13 +175,19 @@ async def _configure_partition_ranges(self): ) for targetQueryExContext in targetPartitionQueryExecutionContextList: - try: # TODO: we can also use more_itertools.peekable to be more python friendly await targetQueryExContext.peek() # if there are matching results in the target ex range add it to the priority queue - self._orderByPQ.push(targetQueryExContext) + await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator) + + except exceptions.CosmosHttpResponseError as e: + if exceptions._partition_range_is_gone(e): + # repairing document producer context on partition split + await self._repair_document_producer() + else: + raise except StopAsyncIteration: continue diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py index 7942afca92c0..d0ed0b78bee6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py @@ -100,7 +100,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i self._orderByPQ.push(targetQueryExContext) except exceptions.CosmosHttpResponseError as e: - if exceptions.partition_range_is_gone(e): + if exceptions._partition_range_is_gone(e): # repairing document producer context on partition split self._repair_document_producer() else: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index b9d25a2fb113..601bdeb72ce6 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -101,7 +101,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE ): retry_policy = sessionRetry_policy - elif exceptions.partition_range_is_gone(e): + elif exceptions._partition_range_is_gone(e): retry_policy = partition_key_range_gone_retry_policy else: retry_policy = defaultRetry_policy diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py index 817531275040..ebf1ee82b005 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py @@ -54,7 +54,7 @@ async def get_overlapping_ranges(self, collection_link, partition_key_ranges): overlapping partition key ranges. :param str collection_link: The name of the collection. - :param list partition_key_range: List of partition key range. + :param list partition_key_ranges: List of partition key range. :return: List of overlapping partition key ranges. :rtype: list """ @@ -127,7 +127,7 @@ def _subtract_range(r, partition_key_range): class SmartRoutingMapProvider(PartitionKeyRangeCache): """ - Efficiently uses PartitionKeyRangeCach and minimizes the unnecessary + Efficiently uses PartitionKeyRangeCache and minimizes the unnecessary invocation of CollectionRoutingMap.get_overlapping_ranges() """ diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 0e6e26ea30ee..75bb097d0c94 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -2477,6 +2477,10 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key): return partitionKey + def refresh_routing_map_provider(self): + # re-initializes the routing map provider, effectively refreshing the current partition key range cache + self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self) + async def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs): supported_query_features = (documents._QueryFeature.Aggregate + "," + documents._QueryFeature.CompositeAggregate + "," + diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index c2fa3b5300ab..50692c596f0e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -35,6 +35,7 @@ from .. import _resource_throttle_retry_policy from .. import _default_retry_policy from .. import _session_retry_policy +from .. import _gone_retry_policy # pylint: disable=protected-access @@ -68,6 +69,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg sessionRetry_policy = _session_retry_policy._SessionRetryPolicy( client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args ) + partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args) + while True: try: client_timeout = kwargs.get('timeout') @@ -100,6 +103,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE ): retry_policy = sessionRetry_policy + elif exceptions._partition_range_is_gone(e): + retry_policy = partition_key_range_gone_retry_policy else: retry_policy = defaultRetry_policy diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/scripts.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/scripts.py index 61e339410910..35aba7065e55 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/scripts.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/scripts.py @@ -304,7 +304,7 @@ async def create_trigger(self, body, **kwargs): @distributed_trace_async async def replace_trigger(self, trigger, body, **kwargs): # type: (Union[str, Dict[str, Any]], Dict[str, Any], Any) -> Dict[str, Any] - """Replace a specified tigger in the container. + """Replace a specified trigger in the container. If the trigger does not already exist in the container, an exception is raised. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py b/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py index ff23c8b1e11f..a4089175c967 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py @@ -75,7 +75,7 @@ def __init__(self, **kwargs): super(CosmosClientTimeoutError, self).__init__(message, **kwargs) -def partition_range_is_gone(e): +def _partition_range_is_gone(e): if (e.status_code == http_constants.StatusCodes.GONE and e.sub_status == http_constants.SubStatusCodes.PARTITION_KEY_RANGE_GONE): return True