From 502631622d90089358f5a4ae073245592d3d1c34 Mon Sep 17 00:00:00 2001 From: Simon Moreno <30335873+simorenoh@users.noreply.github.com> Date: Tue, 25 Jan 2022 13:21:45 -0500 Subject: [PATCH] [Cosmos] split proof queries sync client (#22237) * initial commit * Client Constructor (#20310) * Removed some stuff * Looking at constructors * Updated request * Added client close * working client creation Co-authored-by: simorenoh * read database database read works, but ignored exception is returned: Fatal error on SSL transport NoneType has no attribute 'send' (_loop._proactor.send) RuntimeError: Event loop is closed Unclosed connector/ connection * Update simon_testfile.py * with coroutine Added methods needed to use async with when initializing client, but logs output "Exception ignored... Runtime Error: Event loop is closed" * Update simon_testfile.py * small changes * async with returns no exceptions * async read container * async item read * cleaning up * create item/ database methods * item delete working * docs replace functionality missing upsert and other resources * upsert functionality missing read_all_items and both query methods for container class * missing query methods * CRUD for udf, sproc, triggers * initial query logic + container methods * missing some execution logic and tests * oops * fully working queries * small fix to query_items() also fixed README and added examples_async * Update _cosmos_client_connection_async.py * Update _cosmos_client_connection.py * documentation update * updated MIT dates and get_user_client() description * Update CHANGELOG.md * Delete simon_testfile.py * leftover retry utility * Update README.md * docs and removed six package * changes based on comments still missing discussion resolution on SSL verification and tests for async functionality under test module (apart from samples which are basically end to end tests) * small change in type hints * updated readme * fixes based on conversations * added missing type comments * update changelog for ci pipeline * added typehints, moved params into keywords, added decorators, made _connection_policy private * changes based on sync with central sdk * remove is_system_key from scripts (only used in execute_sproc) is_system_key verifies that an empty partition key is properly dealt with if ['partitionKey']['systemKey'] exists in the container options - however, we do not allow containers to be created with empty partition key values in the python sdk, so the functionality is needless * Revert "remove is_system_key from scripts (only used in execute_sproc)" Reverting last commit, will find way to init is_system_key for now * async script proxy using composition * pylint * capitalized constants * Apply suggestions from code review Clarifying comments for README Co-authored-by: Gahl Levy <75269480+gahl-levy@users.noreply.github.com> * closing python code snippet * last doc updates * Update sdk/cosmos/azure-cosmos/CHANGELOG.md Co-authored-by: Simon Moreno <30335873+simorenoh@users.noreply.github.com> * version update * cosmos updates for release * current state gone_retry_policy might end up being unneccesary, based on what we feel is best from an arch standpoint * working split proof, need to remove prints * improving comments and removing print statements * removed last prints and used constants * Update CHANGELOG.md * small fixes based on comments * addressed more comments * added test, made slight changes * rename test and small changes * pylint * pylintpylintpylint * moved partition_range_gone check to exceptions since makes more sense * re use code Co-authored-by: annatisch Co-authored-by: Gahl Levy <75269480+gahl-levy@users.noreply.github.com> Co-authored-by: Travis Prescott --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 3 + .../azure/cosmos/_cosmos_client_connection.py | 5 + .../base_execution_context.py | 4 +- .../multi_execution_aggregator.py | 45 +++++++- .../azure/cosmos/_gone_retry_policy.py | 53 +++++++++ .../azure/cosmos/_retry_utility.py | 6 + .../cosmos/_routing/routing_map_provider.py | 2 +- .../azure-cosmos/azure/cosmos/exceptions.py | 7 ++ sdk/cosmos/azure-cosmos/test/test_config.py | 21 ++++ .../test/test_partition_split_query.py | 107 ++++++++++++++++++ 10 files changed, 246 insertions(+), 7 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/azure/cosmos/_gone_retry_policy.py create mode 100644 sdk/cosmos/azure-cosmos/test/test_partition_split_query.py diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 54dc0c4585b8..21fe050c9d1d 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -5,6 +5,9 @@ This version and all future versions will require Python 3.6+. Python 2.7 is no longer supported. We will also be removing support for Python 3.6 and will only support Python 3.7+ starting December 2022. +#### Features Added +- Added support for split-proof queries for the sync client + #### Other Changes - Added async user agent for async client diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index d7c73560bdb6..d5d4ddbc8cb0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -2556,6 +2556,11 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key): return partitionKey + def refresh_routing_map_provider(self): + # re-initializes the routing map provider, effectively refreshing the current partition key range cache + self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self) + + def _UpdateSessionIfRequired(self, request_headers, response_result, response_headers): """ Updates session if necessary. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py index 3897e5a8a6b1..134bad9c876a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py @@ -25,8 +25,7 @@ from collections import deque import copy -from .. import _retry_utility -from .. import http_constants +from .. import _retry_utility, http_constants # pylint: disable=protected-access @@ -120,6 +119,7 @@ def _fetch_items_helper_no_retries(self, fetch_function): self._has_started = True new_options = copy.deepcopy(self._options) new_options["continuation"] = self._continuation + (fetched_items, response_headers) = fetch_function(new_options) continuation_key = http_constants.HttpHeaders.Continuation # Use Etag as continuation token for change feed queries. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py index 7b0af3e89f98..7942afca92c0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/multi_execution_aggregator.py @@ -26,6 +26,7 @@ from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase from azure.cosmos._execution_context import document_producer from azure.cosmos._routing import routing_range +from azure.cosmos import exceptions # pylint: disable=protected-access @@ -78,8 +79,8 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i else: self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator() - # will be a list of (parition_min, partition_max) tuples - targetPartitionRanges = self._get_target_parition_key_range() + # will be a list of (partition_min, partition_max) tuples + targetPartitionRanges = self._get_target_partition_key_range() targetPartitionQueryExecutionContextList = [] for partitionTargetRange in targetPartitionRanges: @@ -91,7 +92,6 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue() for targetQueryExContext in targetPartitionQueryExecutionContextList: - try: # TODO: we can also use more_itertools.peekable to be more python friendly targetQueryExContext.peek() @@ -99,6 +99,13 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i self._orderByPQ.push(targetQueryExContext) + except exceptions.CosmosHttpResponseError as e: + if exceptions.partition_range_is_gone(e): + # repairing document producer context on partition split + self._repair_document_producer() + else: + raise + except StopIteration: continue @@ -129,6 +136,36 @@ def fetch_next_block(self): raise NotImplementedError("You should use pipeline's fetch_next_block.") + def _repair_document_producer(self): + """Repairs the document producer context by using the re-initialized routing map provider in the client, + which loads in a refreshed partition key range cache to re-create the partition key ranges. + After loading this new cache, the document producers get re-created with the new valid ranges. + """ + # refresh the routing provider to get the newly initialized one post-refresh + self._routing_provider = self._client._routing_map_provider + # will be a list of (partition_min, partition_max) tuples + targetPartitionRanges = self._get_target_partition_key_range() + + targetPartitionQueryExecutionContextList = [] + for partitionTargetRange in targetPartitionRanges: + # create and add the child execution context for the target range + targetPartitionQueryExecutionContextList.append( + self._createTargetPartitionQueryExecutionContext(partitionTargetRange) + ) + + self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue() + + for targetQueryExContext in targetPartitionQueryExecutionContextList: + try: + # TODO: we can also use more_itertools.peekable to be more python friendly + targetQueryExContext.peek() + # if there are matching results in the target ex range add it to the priority queue + + self._orderByPQ.push(targetQueryExContext) + + except StopIteration: + continue + def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range): rewritten_query = self._partitioned_query_ex_info.get_rewritten_query() @@ -151,7 +188,7 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range self._options, ) - def _get_target_parition_key_range(self): + def _get_target_partition_key_range(self): query_ranges = self._partitioned_query_ex_info.get_query_ranges() return self._routing_provider.get_overlapping_ranges( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_gone_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_gone_retry_policy.py new file mode 100644 index 000000000000..78ebaab65bc8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_gone_retry_policy.py @@ -0,0 +1,53 @@ +# The MIT License (MIT) +# Copyright (c) 2021 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Internal class for connection reset retry policy implementation in the Azure +Cosmos database service. +""" + + +# pylint: disable=protected-access + + +class PartitionKeyRangeGoneRetryPolicy(object): + + def __init__(self, client, *args): + self.retry_after_in_milliseconds = 1000 + self.refresh_partition_key_range_cache = True + self.args = args + self.client = client + self.exception = None + + def ShouldRetry(self, exception): + """Returns true if should retry based on the passed-in exception. + + :param (exceptions.CosmosHttpResponseError instance) exception: + :rtype: boolean + + """ + self.exception = exception # needed for pylint + if self.refresh_partition_key_range_cache: + # refresh routing_map_provider to refresh partition key range cache + # make refresh_partition_key_range_cache False to skip this check on subsequent Gone exceptions + self.client.refresh_routing_map_provider() + self.refresh_partition_key_range_cache = False + # return False to raise error to multi_execution_aggregator and repair document producer context + return False diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index 62747c83d294..b9d25a2fb113 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -32,8 +32,10 @@ from . import _resource_throttle_retry_policy from . import _default_retry_policy from . import _session_retry_policy +from . import _gone_retry_policy from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes + # pylint: disable=protected-access @@ -65,6 +67,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): sessionRetry_policy = _session_retry_policy._SessionRetryPolicy( client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args ) + partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args) + while True: try: client_timeout = kwargs.get('timeout') @@ -97,6 +101,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE ): retry_policy = sessionRetry_policy + elif exceptions.partition_range_is_gone(e): + retry_policy = partition_key_range_gone_retry_policy else: retry_policy = defaultRetry_policy diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py index 97de0efb7053..a668cc03cab3 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_routing/routing_map_provider.py @@ -55,7 +55,7 @@ def get_overlapping_ranges(self, collection_link, partition_key_ranges): overlapping partition key ranges. :param str collection_link: The name of the collection. - :param list partition_key_range: List of partition key range. + :param list partition_key_ranges: List of partition key range. :return: List of overlapping partition key ranges. :rtype: list """ diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py b/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py index da610b65de14..ff23c8b1e11f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py @@ -73,3 +73,10 @@ def __init__(self, **kwargs): self.response = None self.history = None super(CosmosClientTimeoutError, self).__init__(message, **kwargs) + + +def partition_range_is_gone(e): + if (e.status_code == http_constants.StatusCodes.GONE + and e.sub_status == http_constants.SubStatusCodes.PARTITION_KEY_RANGE_GONE): + return True + return False diff --git a/sdk/cosmos/azure-cosmos/test/test_config.py b/sdk/cosmos/azure-cosmos/test/test_config.py index 32be96cc5d42..cdd829e5414b 100644 --- a/sdk/cosmos/azure-cosmos/test/test_config.py +++ b/sdk/cosmos/azure-cosmos/test/test_config.py @@ -82,6 +82,16 @@ def create_database_if_not_exist(cls, client): cls.IS_MULTIMASTER_ENABLED = client.get_database_account()._EnableMultipleWritableLocations return cls.TEST_DATABASE + @classmethod + def create_database_if_not_exist_with_throughput(cls, client, throughput): + # type: (CosmosClient) -> Database + if cls.TEST_DATABASE is not None: + return cls.TEST_DATABASE + cls.try_delete_database(client) + cls.TEST_DATABASE = client.create_database(id=cls.TEST_DATABASE_ID, offer_throughput=throughput) + cls.IS_MULTIMASTER_ENABLED = client.get_database_account()._EnableMultipleWritableLocations + return cls.TEST_DATABASE + @classmethod def try_delete_database(cls, client): # type: (CosmosClient) -> None @@ -119,6 +129,17 @@ def create_multi_partition_collection_with_custom_pk_if_not_exist(cls, client): cls.remove_all_documents(cls.TEST_COLLECTION_MULTI_PARTITION_WITH_CUSTOM_PK, True) return cls.TEST_COLLECTION_MULTI_PARTITION_WITH_CUSTOM_PK + @classmethod + def create_collection_no_custom_throughput(cls, client): + # type: (CosmosClient) -> Container + database = cls.create_database_if_not_exist(client) + collection_id = cls.TEST_COLLECTION_SINGLE_PARTITION_ID + + document_collection = database.create_container( + id=collection_id, + partition_key=PartitionKey(path="/id")) + return document_collection + @classmethod def create_collection_with_required_throughput(cls, client, throughput, use_custom_partition_key): # type: (CosmosClient, int, boolean) -> Container diff --git a/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py b/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py new file mode 100644 index 000000000000..ad284af91eee --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py @@ -0,0 +1,107 @@ +# The MIT License (MIT) +# Copyright (c) 2021 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import unittest + +import azure.cosmos.cosmos_client as cosmos_client +import pytest +import time +import random +import uuid +import test_config + +# This test class serves to test partition splits within the query context + +pytestmark = pytest.mark.cosmosEmulator + + +@pytest.mark.usefixtures("teardown") +class TestPartitionSplitQuery(unittest.TestCase): + configs = test_config._test_config + host = configs.host + masterKey = configs.masterKey + throughput = 400 + + @classmethod + def setUpClass(cls): + cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey) + cls.database = test_config._test_config.create_database_if_not_exist_with_throughput(cls.client, cls.throughput) + cls.container = test_config._test_config.create_collection_no_custom_throughput(cls.client) + + def test_partition_split_query(self): + for i in range(1000): + body = self.get_test_item() + self.container.create_item(body=body) + + print("created items, changing offer to 11k and starting queries") + self.database.replace_throughput(11000) + offer_time = time.time() + print("changed offer to 11k") + print("--------------------------------") + print("now starting queries") + + self.run_queries(self.container, 1000) # initial check for queries before partition split + print("initial check succeeded, now reading offer until replacing is done") + offer = self.database.read_offer() + while True: + if offer.properties['content'].get('isOfferReplacePending', False): + time.sleep(10) + offer = self.database.read_offer() + else: + print("offer replaced successfully, took around {} seconds".format(time.time() - offer_time)) + self.run_queries(self.container, 1000) # check queries work post partition split + print("test over") + self.assertTrue(offer.offer_throughput > self.throughput) + self.client.delete_database(self.configs.TEST_DATABASE_ID) + return + + def run_queries(self, container, iterations): + ret_list = list() + for i in range(iterations): + curr = str(random.randint(0, 10)) + query = 'SELECT * FROM c WHERE c.attr1=' + curr + ' order by c.attr1' + qlist = list(container.query_items(query=query, enable_cross_partition_query=True)) + ret_list.append((curr, qlist)) + for ret in ret_list: + curr = ret[0] + if len(ret[1]) != 0: + for results in ret[1]: + attr_number = results['attr1'] + assert str(attr_number) == curr # verify that all results match their randomly generated attributes + print("validation succeeded for all query results") + + def get_test_item(self): + async_item = { + 'id': 'Async_' + str(uuid.uuid4()), + 'address': { + 'state': 'WA', + 'city': 'Redmond', + 'street': '1 Microsoft Way' + }, + 'test_object': True, + 'lastName': 'Smith', + 'attr1': random.randint(0, 10) + } + return async_item + + +if __name__ == "__main__": + unittest.main()