Skip to content

Commit

Permalink
[Cosmos] split proof queries sync client (Azure#22237)
Browse files Browse the repository at this point in the history
* initial commit

* Client Constructor (Azure#20310)

* Removed some stuff

* Looking at constructors

* Updated request

* Added client close

* working client creation

Co-authored-by: simorenoh <[email protected]>

* read database

database read works, but ignored exception is returned:
Fatal error on SSL transport
NoneType has no attribute 'send' (_loop._proactor.send)
RuntimeError: Event loop is closed
Unclosed connector/ connection

* Update simon_testfile.py

* with coroutine

Added methods needed to use async with when initializing client, but logs output "Exception ignored... Runtime Error: Event loop is closed"

* Update simon_testfile.py

* small changes

* async with returns no exceptions

* async read container

* async item read

* cleaning up

* create item/ database methods

* item delete working

* docs replace functionality

missing upsert and other resources

* upsert functionality

missing read_all_items and both query methods for container class

* missing query methods

* CRUD for udf, sproc, triggers

* initial query logic + container methods

* missing some execution logic and tests

* oops

* fully working queries

* small fix to query_items()

also fixed README and added examples_async

* Update _cosmos_client_connection_async.py

* Update _cosmos_client_connection.py

* documentation update

* updated MIT dates and get_user_client() description

* Update CHANGELOG.md

* Delete simon_testfile.py

* leftover retry utility

* Update README.md

* docs and removed six package

* changes based on comments

still missing discussion resolution on SSL verification and tests for async functionality under test module (apart from samples which are basically end to end tests)

* small change in type hints

* updated readme

* fixes based on conversations

* added missing type comments

* update changelog for ci pipeline

* added typehints, moved params into keywords, added decorators, made _connection_policy private

* changes based on sync with central sdk

* remove is_system_key from scripts (only used in execute_sproc)

is_system_key verifies that an empty partition key is properly dealt with if ['partitionKey']['systemKey'] exists in the container options - however, we do not allow containers to be created with empty partition key values in the python sdk, so the functionality is needless

* Revert "remove is_system_key from scripts (only used in execute_sproc)"

Reverting last commit, will find way to init is_system_key for now

* async script proxy using composition

* pylint

* capitalized constants

* Apply suggestions from code review

Clarifying comments for README

Co-authored-by: Gahl Levy <[email protected]>

* closing python code snippet

* last doc updates

* Update sdk/cosmos/azure-cosmos/CHANGELOG.md

Co-authored-by: Simon Moreno <[email protected]>

* version update

* cosmos updates for release

* current state

gone_retry_policy might end up being unneccesary, based on what we feel is best from an arch standpoint

* working split proof, need to remove prints

* improving comments and removing print statements

* removed last prints and used constants

* Update CHANGELOG.md

* small fixes based on comments

* addressed more comments

* added test, made slight changes

* rename test and small changes

* pylint

* pylintpylintpylint

* moved partition_range_gone check to exceptions since makes more sense

* re use code

Co-authored-by: annatisch <[email protected]>
Co-authored-by: Gahl Levy <[email protected]>
Co-authored-by: Travis Prescott <[email protected]>
  • Loading branch information
4 people authored and rakshith91 committed Apr 10, 2022
1 parent d67e7b4 commit 5026316
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 7 deletions.
3 changes: 3 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
This version and all future versions will require Python 3.6+. Python 2.7 is no longer supported.
We will also be removing support for Python 3.6 and will only support Python 3.7+ starting December 2022.

#### Features Added
- Added support for split-proof queries for the sync client

#### Other Changes
- Added async user agent for async client

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2556,6 +2556,11 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key):

return partitionKey

def refresh_routing_map_provider(self):
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)


def _UpdateSessionIfRequired(self, request_headers, response_result, response_headers):
"""
Updates session if necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@

from collections import deque
import copy
from .. import _retry_utility
from .. import http_constants
from .. import _retry_utility, http_constants

# pylint: disable=protected-access

Expand Down Expand Up @@ -120,6 +119,7 @@ def _fetch_items_helper_no_retries(self, fetch_function):
self._has_started = True
new_options = copy.deepcopy(self._options)
new_options["continuation"] = self._continuation

(fetched_items, response_headers) = fetch_function(new_options)
continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from azure.cosmos._execution_context.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context import document_producer
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions

# pylint: disable=protected-access

Expand Down Expand Up @@ -78,8 +79,8 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
else:
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator()

# will be a list of (parition_min, partition_max) tuples
targetPartitionRanges = self._get_target_parition_key_range()
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = self._get_target_partition_key_range()

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
Expand All @@ -91,14 +92,20 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()

for targetQueryExContext in targetPartitionQueryExecutionContextList:

try:
# TODO: we can also use more_itertools.peekable to be more python friendly
targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue

self._orderByPQ.push(targetQueryExContext)

except exceptions.CosmosHttpResponseError as e:
if exceptions.partition_range_is_gone(e):
# repairing document producer context on partition split
self._repair_document_producer()
else:
raise

except StopIteration:
continue

Expand Down Expand Up @@ -129,6 +136,36 @@ def fetch_next_block(self):

raise NotImplementedError("You should use pipeline's fetch_next_block.")

def _repair_document_producer(self):
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
which loads in a refreshed partition key range cache to re-create the partition key ranges.
After loading this new cache, the document producers get re-created with the new valid ranges.
"""
# refresh the routing provider to get the newly initialized one post-refresh
self._routing_provider = self._client._routing_map_provider
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = self._get_target_partition_key_range()

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create and add the child execution context for the target range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()

for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
# TODO: we can also use more_itertools.peekable to be more python friendly
targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue

self._orderByPQ.push(targetQueryExContext)

except StopIteration:
continue

def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):

rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
Expand All @@ -151,7 +188,7 @@ def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range
self._options,
)

def _get_target_parition_key_range(self):
def _get_target_partition_key_range(self):

query_ranges = self._partitioned_query_ex_info.get_query_ranges()
return self._routing_provider.get_overlapping_ranges(
Expand Down
53 changes: 53 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_gone_retry_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal class for connection reset retry policy implementation in the Azure
Cosmos database service.
"""


# pylint: disable=protected-access


class PartitionKeyRangeGoneRetryPolicy(object):

def __init__(self, client, *args):
self.retry_after_in_milliseconds = 1000
self.refresh_partition_key_range_cache = True
self.args = args
self.client = client
self.exception = None

def ShouldRetry(self, exception):
"""Returns true if should retry based on the passed-in exception.
:param (exceptions.CosmosHttpResponseError instance) exception:
:rtype: boolean
"""
self.exception = exception # needed for pylint
if self.refresh_partition_key_range_cache:
# refresh routing_map_provider to refresh partition key range cache
# make refresh_partition_key_range_cache False to skip this check on subsequent Gone exceptions
self.client.refresh_routing_map_provider()
self.refresh_partition_key_range_cache = False
# return False to raise error to multi_execution_aggregator and repair document producer context
return False
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
from . import _resource_throttle_retry_policy
from . import _default_retry_policy
from . import _session_retry_policy
from . import _gone_retry_policy
from .http_constants import HttpHeaders, StatusCodes, SubStatusCodes


# pylint: disable=protected-access


Expand Down Expand Up @@ -65,6 +67,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
sessionRetry_policy = _session_retry_policy._SessionRetryPolicy(
client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args
)
partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args)

while True:
try:
client_timeout = kwargs.get('timeout')
Expand Down Expand Up @@ -97,6 +101,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
):
retry_policy = sessionRetry_policy
elif exceptions.partition_range_is_gone(e):
retry_policy = partition_key_range_gone_retry_policy
else:
retry_policy = defaultRetry_policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_overlapping_ranges(self, collection_link, partition_key_ranges):
overlapping partition key ranges.
:param str collection_link: The name of the collection.
:param list partition_key_range: List of partition key range.
:param list partition_key_ranges: List of partition key range.
:return: List of overlapping partition key ranges.
:rtype: list
"""
Expand Down
7 changes: 7 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,10 @@ def __init__(self, **kwargs):
self.response = None
self.history = None
super(CosmosClientTimeoutError, self).__init__(message, **kwargs)


def partition_range_is_gone(e):
if (e.status_code == http_constants.StatusCodes.GONE
and e.sub_status == http_constants.SubStatusCodes.PARTITION_KEY_RANGE_GONE):
return True
return False
21 changes: 21 additions & 0 deletions sdk/cosmos/azure-cosmos/test/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ def create_database_if_not_exist(cls, client):
cls.IS_MULTIMASTER_ENABLED = client.get_database_account()._EnableMultipleWritableLocations
return cls.TEST_DATABASE

@classmethod
def create_database_if_not_exist_with_throughput(cls, client, throughput):
# type: (CosmosClient) -> Database
if cls.TEST_DATABASE is not None:
return cls.TEST_DATABASE
cls.try_delete_database(client)
cls.TEST_DATABASE = client.create_database(id=cls.TEST_DATABASE_ID, offer_throughput=throughput)
cls.IS_MULTIMASTER_ENABLED = client.get_database_account()._EnableMultipleWritableLocations
return cls.TEST_DATABASE

@classmethod
def try_delete_database(cls, client):
# type: (CosmosClient) -> None
Expand Down Expand Up @@ -119,6 +129,17 @@ def create_multi_partition_collection_with_custom_pk_if_not_exist(cls, client):
cls.remove_all_documents(cls.TEST_COLLECTION_MULTI_PARTITION_WITH_CUSTOM_PK, True)
return cls.TEST_COLLECTION_MULTI_PARTITION_WITH_CUSTOM_PK

@classmethod
def create_collection_no_custom_throughput(cls, client):
# type: (CosmosClient) -> Container
database = cls.create_database_if_not_exist(client)
collection_id = cls.TEST_COLLECTION_SINGLE_PARTITION_ID

document_collection = database.create_container(
id=collection_id,
partition_key=PartitionKey(path="/id"))
return document_collection

@classmethod
def create_collection_with_required_throughput(cls, client, throughput, use_custom_partition_key):
# type: (CosmosClient, int, boolean) -> Container
Expand Down
107 changes: 107 additions & 0 deletions sdk/cosmos/azure-cosmos/test/test_partition_split_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# The MIT License (MIT)
# Copyright (c) 2021 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import unittest

import azure.cosmos.cosmos_client as cosmos_client
import pytest
import time
import random
import uuid
import test_config

# This test class serves to test partition splits within the query context

pytestmark = pytest.mark.cosmosEmulator


@pytest.mark.usefixtures("teardown")
class TestPartitionSplitQuery(unittest.TestCase):
configs = test_config._test_config
host = configs.host
masterKey = configs.masterKey
throughput = 400

@classmethod
def setUpClass(cls):
cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey)
cls.database = test_config._test_config.create_database_if_not_exist_with_throughput(cls.client, cls.throughput)
cls.container = test_config._test_config.create_collection_no_custom_throughput(cls.client)

def test_partition_split_query(self):
for i in range(1000):
body = self.get_test_item()
self.container.create_item(body=body)

print("created items, changing offer to 11k and starting queries")
self.database.replace_throughput(11000)
offer_time = time.time()
print("changed offer to 11k")
print("--------------------------------")
print("now starting queries")

self.run_queries(self.container, 1000) # initial check for queries before partition split
print("initial check succeeded, now reading offer until replacing is done")
offer = self.database.read_offer()
while True:
if offer.properties['content'].get('isOfferReplacePending', False):
time.sleep(10)
offer = self.database.read_offer()
else:
print("offer replaced successfully, took around {} seconds".format(time.time() - offer_time))
self.run_queries(self.container, 1000) # check queries work post partition split
print("test over")
self.assertTrue(offer.offer_throughput > self.throughput)
self.client.delete_database(self.configs.TEST_DATABASE_ID)
return

def run_queries(self, container, iterations):
ret_list = list()
for i in range(iterations):
curr = str(random.randint(0, 10))
query = 'SELECT * FROM c WHERE c.attr1=' + curr + ' order by c.attr1'
qlist = list(container.query_items(query=query, enable_cross_partition_query=True))
ret_list.append((curr, qlist))
for ret in ret_list:
curr = ret[0]
if len(ret[1]) != 0:
for results in ret[1]:
attr_number = results['attr1']
assert str(attr_number) == curr # verify that all results match their randomly generated attributes
print("validation succeeded for all query results")

def get_test_item(self):
async_item = {
'id': 'Async_' + str(uuid.uuid4()),
'address': {
'state': 'WA',
'city': 'Redmond',
'street': '1 Microsoft Way'
},
'test_object': True,
'lastName': 'Smith',
'attr1': random.randint(0, 10)
}
return async_item


if __name__ == "__main__":
unittest.main()

0 comments on commit 5026316

Please sign in to comment.