Skip to content

Commit

Permalink
[Cosmos] split proof queries async client (Azure#22261)
Browse files Browse the repository at this point in the history
* initial commit

* Client Constructor (Azure#20310)

* Removed some stuff

* Looking at constructors

* Updated request

* Added client close

* working client creation

Co-authored-by: simorenoh <[email protected]>

* read database

database read works, but ignored exception is returned:
Fatal error on SSL transport
NoneType has no attribute 'send' (_loop._proactor.send)
RuntimeError: Event loop is closed
Unclosed connector/ connection

* Update simon_testfile.py

* with coroutine

Added methods needed to use async with when initializing client, but logs output "Exception ignored... Runtime Error: Event loop is closed"

* Update simon_testfile.py

* small changes

* async with returns no exceptions

* async read container

* async item read

* cleaning up

* create item/ database methods

* item delete working

* docs replace functionality

missing upsert and other resources

* upsert functionality

missing read_all_items and both query methods for container class

* missing query methods

* CRUD for udf, sproc, triggers

* initial query logic + container methods

* missing some execution logic and tests

* oops

* fully working queries

* small fix to query_items()

also fixed README and added examples_async

* Update _cosmos_client_connection_async.py

* Update _cosmos_client_connection.py

* documentation update

* updated MIT dates and get_user_client() description

* Update CHANGELOG.md

* Delete simon_testfile.py

* leftover retry utility

* Update README.md

* docs and removed six package

* changes based on comments

still missing discussion resolution on SSL verification and tests for async functionality under test module (apart from samples which are basically end to end tests)

* small change in type hints

* updated readme

* fixes based on conversations

* added missing type comments

* update changelog for ci pipeline

* added typehints, moved params into keywords, added decorators, made _connection_policy private

* changes based on sync with central sdk

* remove is_system_key from scripts (only used in execute_sproc)

is_system_key verifies that an empty partition key is properly dealt with if ['partitionKey']['systemKey'] exists in the container options - however, we do not allow containers to be created with empty partition key values in the python sdk, so the functionality is needless

* Revert "remove is_system_key from scripts (only used in execute_sproc)"

Reverting last commit, will find way to init is_system_key for now

* async script proxy using composition

* pylint

* capitalized constants

* Apply suggestions from code review

Clarifying comments for README

Co-authored-by: Gahl Levy <[email protected]>

* closing python code snippet

* last doc updates

* Update sdk/cosmos/azure-cosmos/CHANGELOG.md

Co-authored-by: Simon Moreno <[email protected]>

* version update

* cosmos updates for release

* working split proof for async client, need to remove prints() and make better comments

* remove print statements and improve comments

* Update CHANGELOG.md

* pylint

* address Annie's comments on sync split proof

* parity with sync client

* async comparing of document producers

* removed unneeded logic/imports, made compares async for pylint attempt

* spelling mistake and making private

* making sync client private too

* Update CHANGELOG.md

* Delete test_axq.py

Co-authored-by: annatisch <[email protected]>
Co-authored-by: Gahl Levy <[email protected]>
Co-authored-by: Travis Prescott <[email protected]>
  • Loading branch information
4 people authored and rakshith91 committed Apr 10, 2022
1 parent 4851990 commit 9a2b86b
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 37 deletions.
9 changes: 7 additions & 2 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

### 4.3.0b3 (Unreleased)

#### Features Added
- Added support for split-proof queries for the async client

### Bugs fixed
- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`. Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`.
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.

### 4.3.0b2 (2022-01-25)

Expand All @@ -19,7 +23,8 @@ We will also be removing support for Python 3.6 and will only support Python 3.7
- Added async user agent for async client

### 4.3.0b1 (2021-12-14)
**New features**

#### Features Added
- Added language native async i/o client

### 4.2.0 (2020-10-08)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# The MIT License (MIT)
# Copyright (c) 2022 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

async def heap_push(heap, item, document_producer_comparator):
"""Push item onto heap, maintaining the heap invariant."""
heap.append(item)
await _sift_down(heap, document_producer_comparator, 0, len(heap) - 1)


async def heap_pop(heap, document_producer_comparator):
"""Pop the smallest item off the heap, maintaining the heap invariant."""
last_elt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
return_item = heap[0]
heap[0] = last_elt
await _sift_up(heap, document_producer_comparator, 0)
return return_item
return last_elt


async def _sift_down(heap, document_producer_comparator, start_pos, pos):
new_item = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# new_item fits.
while pos > start_pos:
parent_pos = (pos - 1) >> 1
parent = heap[parent_pos]
if await document_producer_comparator.compare(new_item, parent) < 0:
# if new_item < parent:
heap[pos] = parent
pos = parent_pos
continue
break
heap[pos] = new_item


async def _sift_up(heap, document_producer_comparator, pos):
end_pos = len(heap)
start_pos = pos
new_item = heap[pos]
# Bubble up the smaller child until hitting a leaf.
child_pos = 2 * pos + 1 # leftmost child position
while child_pos < end_pos:
# Set child_pos to index of smaller child.
right_pos = child_pos + 1
# if right_pos < end_pos and not heap[child_pos] < heap[right_pos]:
if right_pos < end_pos and not await document_producer_comparator.compare(heap[child_pos], heap[right_pos]) < 0:
child_pos = right_pos
# Move the smaller child up.
heap[pos] = heap[child_pos]
pos = child_pos
child_pos = 2 * pos + 1
# The leaf at pos is empty now. Put new_item there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = new_item
await _sift_down(heap, document_producer_comparator, start_pos, pos)
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
self._has_started = True
new_options = copy.deepcopy(self._options)
new_options["continuation"] = self._continuation

(fetched_items, response_headers) = await fetch_function(new_options)
continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ async def fetch_fn(options):

self._ex_context = _DefaultQueryExecutionContext(client, self._options, fetch_fn)

def __lt__(self, other):
return self._doc_producer_comp.compare(self, other) < 0

async def __aiter__(self):
return self

Expand Down Expand Up @@ -104,14 +101,13 @@ async def peek(self):
return self._cur_item



def _compare_helper(a, b):
if a is None and b is None:
return 0
return (a > b) - (a < b)


class _PartitionKeyRangeDocumentProduerComparator(object):
class _PartitionKeyRangeDocumentProducerComparator(object):
"""
Provides a Comparator for document producers using the min value of the
corresponding target partition.
Expand All @@ -120,7 +116,7 @@ class _PartitionKeyRangeDocumentProduerComparator(object):
def __init__(self):
pass

def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use
async def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use
return _compare_helper(
doc_producer1.get_target_range()["minInclusive"], doc_producer2.get_target_range()["minInclusive"]
)
Expand Down Expand Up @@ -179,7 +175,7 @@ def getTypeStr(orderby_item):
raise TypeError("unknown type" + str(val))

@staticmethod
def compare(orderby_item1, orderby_item2):
async def compare(orderby_item1, orderby_item2):
"""Compare two orderby item pairs.
:param dict orderby_item1:
Expand Down Expand Up @@ -213,7 +209,7 @@ def _peek_order_by_items(peek_result):
return peek_result["orderByItems"]


class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProduerComparator):
class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProducerComparator):
"""Provide a Comparator for document producers which respects orderby sort order.
"""

Expand All @@ -229,7 +225,7 @@ def __init__(self, sort_order): # pylint: disable=super-init-not-called
"""
self._sort_order = sort_order

def compare(self, doc_producer1, doc_producer2):
async def compare(self, doc_producer1, doc_producer2):
"""Compares the given two instances of DocumentProducers.
Based on the orderby query items and whether the sort order is Ascending
Expand All @@ -238,29 +234,29 @@ def compare(self, doc_producer1, doc_producer2):
If the peek results are equal based on the sort order, this comparator
compares the target partition key range of the two DocumentProducers.
:param _DocumentProducer doc_producers1: first instance
:param _DocumentProducer doc_producers2: first instance
:param _DocumentProducer doc_producer1: first instance
:param _DocumentProducer doc_producer2: first instance
:return:
Integer value of compare result.
positive integer if doc_producers1 > doc_producers2
negative integer if doc_producers1 < doc_producers2
positive integer if doc_producer1 > doc_producer2
negative integer if doc_producer1 < doc_producer2
:rtype: int
"""

res1 = _peek_order_by_items(doc_producer1.peek())
res2 = _peek_order_by_items(doc_producer2.peek())
res1 = _peek_order_by_items(await doc_producer1.peek())
res2 = _peek_order_by_items(await doc_producer2.peek())

self._validate_orderby_items(res1, res2)

for i, (elt1, elt2) in enumerate(zip(res1, res2)):
res = _OrderByHelper.compare(elt1, elt2)
res = await _OrderByHelper.compare(elt1, elt2)
if res != 0:
if self._sort_order[i] == "Ascending":
return res
if self._sort_order[i] == "Descending":
return -res

return _PartitionKeyRangeDocumentProduerComparator.compare(self, doc_producer1, doc_producer2)
return await _PartitionKeyRangeDocumentProducerComparator.compare(self, doc_producer1, doc_producer2)

def _validate_orderby_items(self, res1, res2):
if len(res1) != len(res2):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def __anext__(self):
query_to_use = self._query if self._query is not None else "Select * from root r"
query_execution_info = _PartitionedQueryExecutionInfo(await self._client._GetQueryPlanThroughGateway
(query_to_use, self._resource_link))
self._execution_context = self._create_pipelined_execution_context(query_execution_info)
self._execution_context = await self._create_pipelined_execution_context(query_execution_info)
else:
raise e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

import heapq
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio import document_producer
from azure.cosmos._execution_context.aio import document_producer, _queue_async_helper
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions

# pylint: disable=protected-access

Expand All @@ -50,11 +50,11 @@ class PriorityQueue:
def __init__(self):
self._heap = []

def pop(self):
return heapq.heappop(self._heap)
async def pop_async(self, document_producer_comparator):
return await _queue_async_helper.heap_pop(self._heap, document_producer_comparator)

def push(self, item):
heapq.heappush(self._heap, item)
async def push_async(self, item, document_producer_comparator):
await _queue_async_helper.heap_push(self._heap, item, document_producer_comparator)

def peek(self):
return self._heap[0]
Expand All @@ -76,7 +76,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
if self._sort_orders:
self._document_producer_comparator = document_producer._OrderByDocumentProducerComparator(self._sort_orders)
else:
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator()
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProducerComparator()

self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()

Expand All @@ -89,13 +89,13 @@ async def __anext__(self):
"""
if self._orderByPQ.size() > 0:

targetRangeExContext = self._orderByPQ.pop()
targetRangeExContext = await self._orderByPQ.pop_async(self._document_producer_comparator)
res = await targetRangeExContext.__anext__()

try:
# TODO: we can also use more_itertools.peekable to be more python friendly
await targetRangeExContext.peek()
self._orderByPQ.push(targetRangeExContext)
await self._orderByPQ.push_async(targetRangeExContext, self._document_producer_comparator)

except StopAsyncIteration:
pass
Expand All @@ -107,6 +107,33 @@ async def fetch_next_block(self):

raise NotImplementedError("You should use pipeline's fetch_next_block.")

async def _repair_document_producer(self):
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
which loads in a refreshed partition key range cache to re-create the partition key ranges.
After loading this new cache, the document producers get re-created with the new valid ranges.
"""
# refresh the routing provider to get the newly initialized one post-refresh
self._routing_provider = self._client._routing_map_provider
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = await self._get_target_partition_key_range()

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create and add the child execution context for the target range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
# TODO: we can also use more_itertools.peekable to be more python friendly
await targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue
await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator)

except StopAsyncIteration:
continue

def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):

rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
Expand Down Expand Up @@ -148,13 +175,19 @@ async def _configure_partition_ranges(self):
)

for targetQueryExContext in targetPartitionQueryExecutionContextList:

try:
# TODO: we can also use more_itertools.peekable to be more python friendly
await targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue

self._orderByPQ.push(targetQueryExContext)
await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator)

except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
await self._repair_document_producer()
else:
raise

except StopAsyncIteration:
continue
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
self._orderByPQ.push(targetQueryExContext)

except exceptions.CosmosHttpResponseError as e:
if exceptions.partition_range_is_gone(e):
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
self._repair_document_producer()
else:
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
):
retry_policy = sessionRetry_policy
elif exceptions.partition_range_is_gone(e):
elif exceptions._partition_range_is_gone(e):
retry_policy = partition_key_range_gone_retry_policy
else:
retry_policy = defaultRetry_policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def get_overlapping_ranges(self, collection_link, partition_key_ranges):
overlapping partition key ranges.
:param str collection_link: The name of the collection.
:param list partition_key_range: List of partition key range.
:param list partition_key_ranges: List of partition key range.
:return: List of overlapping partition key ranges.
:rtype: list
"""
Expand Down Expand Up @@ -127,7 +127,7 @@ def _subtract_range(r, partition_key_range):

class SmartRoutingMapProvider(PartitionKeyRangeCache):
"""
Efficiently uses PartitionKeyRangeCach and minimizes the unnecessary
Efficiently uses PartitionKeyRangeCache and minimizes the unnecessary
invocation of CollectionRoutingMap.get_overlapping_ranges()
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2477,6 +2477,10 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key):

return partitionKey

def refresh_routing_map_provider(self):
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)

async def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
Expand Down
Loading

0 comments on commit 9a2b86b

Please sign in to comment.