Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] split proof queries async client #22261

Merged
merged 83 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
61ba8d1
initial commit
simorenoh Aug 13, 2021
15dcceb
Client Constructor (#20310)
annatisch Aug 20, 2021
bda95c3
read database
simorenoh Aug 27, 2021
c9648ab
Update simon_testfile.py
simorenoh Aug 27, 2021
80540dc
with coroutine
simorenoh Aug 30, 2021
1285438
Update simon_testfile.py
simorenoh Aug 30, 2021
992b0cd
small changes
simorenoh Aug 31, 2021
47cb688
async with returns no exceptions
simorenoh Aug 31, 2021
f3fa79f
Merge pull request #1 from Azure/simonmoreno/async
simorenoh Aug 31, 2021
0c49739
async read container
simorenoh Sep 1, 2021
47f4af5
async item read
simorenoh Sep 2, 2021
c97c946
cleaning up
simorenoh Sep 3, 2021
fcd95db
create item/ database methods
simorenoh Sep 13, 2021
36c5b90
item delete working
simorenoh Sep 13, 2021
44db2a2
docs replace functionality
simorenoh Sep 16, 2021
ec5b6ed
upsert functionality
simorenoh Sep 17, 2021
d63d052
Merge pull request #2 from simorenoh/item-read
simorenoh Oct 8, 2021
5d74c8f
missing query methods
simorenoh Oct 11, 2021
89fc2f7
CRUD for udf, sproc, triggers
simorenoh Oct 12, 2021
fdaa880
Merge branch 'Azure:main' into async-client
simorenoh Oct 12, 2021
3f9baf2
Merge branch 'Azure:main' into async-client
simorenoh Oct 12, 2021
d6650bc
Merge branch 'Azure:main' into query-functionality
simorenoh Oct 12, 2021
043dfe0
initial query logic + container methods
simorenoh Oct 13, 2021
befdb41
Merge branch 'async-client' into query-functionality
simorenoh Oct 13, 2021
8cffbe2
Merge pull request #3 from simorenoh/query-functionality
simorenoh Oct 13, 2021
72de7c8
missing some execution logic and tests
simorenoh Oct 21, 2021
5b805b8
oops
simorenoh Oct 21, 2021
8d8d0c4
fully working queries
simorenoh Oct 22, 2021
b597ca8
small fix to query_items()
simorenoh Oct 22, 2021
18319df
Update _cosmos_client_connection_async.py
simorenoh Oct 22, 2021
162c44d
Update _cosmos_client_connection.py
simorenoh Oct 22, 2021
ebbac51
documentation update
simorenoh Oct 22, 2021
43f78e6
Merge branch 'Azure:main' into main
simorenoh Oct 22, 2021
470aa5b
updated MIT dates and get_user_client() description
simorenoh Oct 22, 2021
74da690
Update CHANGELOG.md
simorenoh Oct 22, 2021
7104d63
Merge branch 'Azure:main' into main
simorenoh Oct 25, 2021
20718c7
Delete simon_testfile.py
simorenoh Oct 25, 2021
d825eaa
Merge pull request #4 from simorenoh/async-client
simorenoh Oct 25, 2021
e3c27a5
leftover retry utility
simorenoh Oct 25, 2021
3b778ad
Update README.md
simorenoh Oct 25, 2021
c6e352e
docs and removed six package
simorenoh Oct 28, 2021
8971a25
Merge remote-tracking branch 'upstream/main'
simorenoh Oct 28, 2021
52736ac
changes based on comments
simorenoh Nov 4, 2021
ad98039
small change in type hints
simorenoh Nov 4, 2021
f76c595
updated readme
simorenoh Nov 9, 2021
3f02a65
fixes based on conversations
simorenoh Nov 10, 2021
e719869
added missing type comments
simorenoh Nov 11, 2021
d03ee05
Merge branch 'Azure:main' into main
simorenoh Nov 11, 2021
02c52ee
update changelog for ci pipeline
simorenoh Nov 23, 2021
2cb4551
added typehints, moved params into keywords, added decorators, made _…
simorenoh Nov 29, 2021
cf20d35
changes based on sync with central sdk
simorenoh Dec 2, 2021
f456817
remove is_system_key from scripts (only used in execute_sproc)
simorenoh Dec 3, 2021
ea9bd16
Revert "remove is_system_key from scripts (only used in execute_sproc)"
simorenoh Dec 3, 2021
709d2eb
async script proxy using composition
simorenoh Dec 3, 2021
3277dd8
pylint
simorenoh Dec 3, 2021
a57cb4d
capitalized constants
simorenoh Dec 6, 2021
014578b
Apply suggestions from code review
simorenoh Dec 6, 2021
0d79695
closing python code snippet
simorenoh Dec 6, 2021
fdabea1
last doc updates
simorenoh Dec 7, 2021
016d0dd
Update sdk/cosmos/azure-cosmos/CHANGELOG.md
tjprescott Dec 7, 2021
8228aa9
version update
simorenoh Dec 7, 2021
7ae1cd0
Merge branch 'Azure:main' into main
simorenoh Dec 13, 2021
7e8e953
cosmos updates for release
simorenoh Dec 13, 2021
9a2acc7
working split proof for async client, need to remove prints() and mak…
simorenoh Dec 24, 2021
a88cdfa
remove print statements and improve comments
simorenoh Jan 11, 2022
99997e9
Merge branch 'main' of https://github.com/Azure/azure-sdk-for-python …
simorenoh Jan 12, 2022
f446308
Merge branch 'Azure-main'
simorenoh Jan 12, 2022
bc466d8
Merge branch 'Azure-main'
simorenoh Jan 12, 2022
bca2da7
Merge branch 'Azure:main' into main
simorenoh Jan 13, 2022
7d41d02
Update CHANGELOG.md
simorenoh Jan 13, 2022
8d49e0e
Merge branch 'main' into split-proof-async
simorenoh Jan 13, 2022
27dce87
pylint
simorenoh Jan 13, 2022
d70269c
address Annie's comments on sync split proof
simorenoh Jan 18, 2022
a86b733
parity with sync client
simorenoh Jan 21, 2022
a075a78
async comparing of document producers
simorenoh Jan 24, 2022
64b9cfa
Merge branch 'main' into split-proof-async
simorenoh Jan 24, 2022
e85b6de
removed unneeded logic/imports, made compares async for pylint attempt
simorenoh Jan 25, 2022
c220ade
spelling mistake and making private
simorenoh Jan 25, 2022
9d6a72f
Merge branch 'main' into split-proof-async
simorenoh Jan 25, 2022
c6d585e
making sync client private too
simorenoh Jan 25, 2022
c5e710a
Update CHANGELOG.md
simorenoh Feb 2, 2022
47a0acb
Delete test_axq.py
simorenoh Feb 2, 2022
6071694
Merge branch 'main' into split-proof-async
simorenoh Feb 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Release History

### 4.3.0b3 (Unreleased)

#### Features Added
- Added support for split-proof queries for the async client

### 4.3.0b2 (2022-01-25)

This version and all future versions will require Python 3.6+. Python 2.7 is no longer supported.
Expand All @@ -12,7 +17,8 @@ We will also be removing support for Python 3.6 and will only support Python 3.7
- Added async user agent for async client

### 4.3.0b1 (2021-12-14)
**New features**

#### Features Added
- Added language native async i/o client

### 4.2.0 (2020-10-08)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# The MIT License (MIT)
# Copyright (c) 2022 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

async def heap_push(heap, item, document_producer_comparator):
"""Push item onto heap, maintaining the heap invariant."""
heap.append(item)
await _sift_down(heap, document_producer_comparator, 0, len(heap) - 1)


async def heap_pop(heap, document_producer_comparator):
"""Pop the smallest item off the heap, maintaining the heap invariant."""
last_elt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
return_item = heap[0]
heap[0] = last_elt
await _sift_up(heap, document_producer_comparator, 0)
return return_item
return last_elt


async def _sift_down(heap, document_producer_comparator, start_pos, pos):
new_item = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# new_item fits.
while pos > start_pos:
parent_pos = (pos - 1) >> 1
parent = heap[parent_pos]
if await document_producer_comparator.compare(new_item, parent) < 0:
# if new_item < parent:
heap[pos] = parent
pos = parent_pos
continue
break
heap[pos] = new_item


async def _sift_up(heap, document_producer_comparator, pos):
end_pos = len(heap)
start_pos = pos
new_item = heap[pos]
# Bubble up the smaller child until hitting a leaf.
child_pos = 2 * pos + 1 # leftmost child position
while child_pos < end_pos:
# Set child_pos to index of smaller child.
right_pos = child_pos + 1
# if right_pos < end_pos and not heap[child_pos] < heap[right_pos]:
if right_pos < end_pos and not await document_producer_comparator.compare(heap[child_pos], heap[right_pos]) < 0:
child_pos = right_pos
# Move the smaller child up.
heap[pos] = heap[child_pos]
pos = child_pos
child_pos = 2 * pos + 1
# The leaf at pos is empty now. Put new_item there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = new_item
await _sift_down(heap, document_producer_comparator, start_pos, pos)
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
self._has_started = True
new_options = copy.deepcopy(self._options)
new_options["continuation"] = self._continuation

(fetched_items, response_headers) = await fetch_function(new_options)
continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ async def fetch_fn(options):

self._ex_context = _DefaultQueryExecutionContext(client, self._options, fetch_fn)

def __lt__(self, other):
return self._doc_producer_comp.compare(self, other) < 0

async def __aiter__(self):
return self

Expand Down Expand Up @@ -104,14 +101,13 @@ async def peek(self):
return self._cur_item



def _compare_helper(a, b):
if a is None and b is None:
return 0
return (a > b) - (a < b)


class _PartitionKeyRangeDocumentProduerComparator(object):
class _PartitionKeyRangeDocumentProducerComparator(object):
"""
Provides a Comparator for document producers using the min value of the
corresponding target partition.
Expand All @@ -120,7 +116,7 @@ class _PartitionKeyRangeDocumentProduerComparator(object):
def __init__(self):
pass

def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use
async def compare(self, doc_producer1, doc_producer2): # pylint: disable=no-self-use
return _compare_helper(
doc_producer1.get_target_range()["minInclusive"], doc_producer2.get_target_range()["minInclusive"]
)
Expand Down Expand Up @@ -179,7 +175,7 @@ def getTypeStr(orderby_item):
raise TypeError("unknown type" + str(val))

@staticmethod
def compare(orderby_item1, orderby_item2):
async def compare(orderby_item1, orderby_item2):
"""Compare two orderby item pairs.

:param dict orderby_item1:
Expand Down Expand Up @@ -213,7 +209,7 @@ def _peek_order_by_items(peek_result):
return peek_result["orderByItems"]


class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProduerComparator):
class _OrderByDocumentProducerComparator(_PartitionKeyRangeDocumentProducerComparator):
"""Provide a Comparator for document producers which respects orderby sort order.
"""

Expand All @@ -229,7 +225,7 @@ def __init__(self, sort_order): # pylint: disable=super-init-not-called
"""
self._sort_order = sort_order

def compare(self, doc_producer1, doc_producer2):
async def compare(self, doc_producer1, doc_producer2):
"""Compares the given two instances of DocumentProducers.

Based on the orderby query items and whether the sort order is Ascending
Expand All @@ -238,29 +234,29 @@ def compare(self, doc_producer1, doc_producer2):
If the peek results are equal based on the sort order, this comparator
compares the target partition key range of the two DocumentProducers.

:param _DocumentProducer doc_producers1: first instance
:param _DocumentProducer doc_producers2: first instance
:param _DocumentProducer doc_producer1: first instance
:param _DocumentProducer doc_producer2: first instance
:return:
Integer value of compare result.
positive integer if doc_producers1 > doc_producers2
negative integer if doc_producers1 < doc_producers2
positive integer if doc_producer1 > doc_producer2
negative integer if doc_producer1 < doc_producer2
:rtype: int
"""

res1 = _peek_order_by_items(doc_producer1.peek())
res2 = _peek_order_by_items(doc_producer2.peek())
res1 = _peek_order_by_items(await doc_producer1.peek())
res2 = _peek_order_by_items(await doc_producer2.peek())

self._validate_orderby_items(res1, res2)

for i, (elt1, elt2) in enumerate(zip(res1, res2)):
res = _OrderByHelper.compare(elt1, elt2)
res = await _OrderByHelper.compare(elt1, elt2)
if res != 0:
if self._sort_order[i] == "Ascending":
return res
if self._sort_order[i] == "Descending":
return -res

return _PartitionKeyRangeDocumentProduerComparator.compare(self, doc_producer1, doc_producer2)
return await _PartitionKeyRangeDocumentProducerComparator.compare(self, doc_producer1, doc_producer2)

def _validate_orderby_items(self, res1, res2):
if len(res1) != len(res2):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def __anext__(self):
query_to_use = self._query if self._query is not None else "Select * from root r"
query_execution_info = _PartitionedQueryExecutionInfo(await self._client._GetQueryPlanThroughGateway
(query_to_use, self._resource_link))
self._execution_context = self._create_pipelined_execution_context(query_execution_info)
self._execution_context = await self._create_pipelined_execution_context(query_execution_info)
else:
raise e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
"""Internal class for multi execution context aggregator implementation in the Azure Cosmos database service.
"""

import heapq
from azure.cosmos._execution_context.aio.base_execution_context import _QueryExecutionContextBase
from azure.cosmos._execution_context.aio import document_producer
from azure.cosmos._execution_context.aio import document_producer, _queue_async_helper
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions

# pylint: disable=protected-access

Expand All @@ -50,11 +50,11 @@ class PriorityQueue:
def __init__(self):
self._heap = []

def pop(self):
return heapq.heappop(self._heap)
async def pop_async(self, document_producer_comparator):
return await _queue_async_helper.heap_pop(self._heap, document_producer_comparator)

def push(self, item):
heapq.heappush(self._heap, item)
async def push_async(self, item, document_producer_comparator):
await _queue_async_helper.heap_push(self._heap, item, document_producer_comparator)

def peek(self):
return self._heap[0]
Expand All @@ -76,7 +76,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
if self._sort_orders:
self._document_producer_comparator = document_producer._OrderByDocumentProducerComparator(self._sort_orders)
else:
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProduerComparator()
self._document_producer_comparator = document_producer._PartitionKeyRangeDocumentProducerComparator()

self._orderByPQ = _MultiExecutionContextAggregator.PriorityQueue()

Expand All @@ -89,13 +89,13 @@ async def __anext__(self):
"""
if self._orderByPQ.size() > 0:

targetRangeExContext = self._orderByPQ.pop()
targetRangeExContext = await self._orderByPQ.pop_async(self._document_producer_comparator)
res = await targetRangeExContext.__anext__()

try:
# TODO: we can also use more_itertools.peekable to be more python friendly
await targetRangeExContext.peek()
self._orderByPQ.push(targetRangeExContext)
await self._orderByPQ.push_async(targetRangeExContext, self._document_producer_comparator)

except StopAsyncIteration:
pass
Expand All @@ -107,6 +107,33 @@ async def fetch_next_block(self):

raise NotImplementedError("You should use pipeline's fetch_next_block.")

async def _repair_document_producer(self):
"""Repairs the document producer context by using the re-initialized routing map provider in the client,
which loads in a refreshed partition key range cache to re-create the partition key ranges.
After loading this new cache, the document producers get re-created with the new valid ranges.
"""
# refresh the routing provider to get the newly initialized one post-refresh
self._routing_provider = self._client._routing_map_provider
# will be a list of (partition_min, partition_max) tuples
targetPartitionRanges = await self._get_target_partition_key_range()

targetPartitionQueryExecutionContextList = []
for partitionTargetRange in targetPartitionRanges:
# create and add the child execution context for the target range
targetPartitionQueryExecutionContextList.append(
self._createTargetPartitionQueryExecutionContext(partitionTargetRange)
)

for targetQueryExContext in targetPartitionQueryExecutionContextList:
try:
# TODO: we can also use more_itertools.peekable to be more python friendly
await targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue
await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator)

except StopAsyncIteration:
continue

def _createTargetPartitionQueryExecutionContext(self, partition_key_target_range):

rewritten_query = self._partitioned_query_ex_info.get_rewritten_query()
Expand Down Expand Up @@ -148,13 +175,19 @@ async def _configure_partition_ranges(self):
)

for targetQueryExContext in targetPartitionQueryExecutionContextList:

try:
# TODO: we can also use more_itertools.peekable to be more python friendly
await targetQueryExContext.peek()
# if there are matching results in the target ex range add it to the priority queue

self._orderByPQ.push(targetQueryExContext)
await self._orderByPQ.push_async(targetQueryExContext, self._document_producer_comparator)

except exceptions.CosmosHttpResponseError as e:
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
await self._repair_document_producer()
else:
raise

except StopAsyncIteration:
continue
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def __init__(self, client, resource_link, query, options, partitioned_query_ex_i
self._orderByPQ.push(targetQueryExContext)

except exceptions.CosmosHttpResponseError as e:
if exceptions.partition_range_is_gone(e):
if exceptions._partition_range_is_gone(e):
# repairing document producer context on partition split
self._repair_document_producer()
else:
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
):
retry_policy = sessionRetry_policy
elif exceptions.partition_range_is_gone(e):
elif exceptions._partition_range_is_gone(e):
retry_policy = partition_key_range_gone_retry_policy
else:
retry_policy = defaultRetry_policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async def get_overlapping_ranges(self, collection_link, partition_key_ranges):
overlapping partition key ranges.

:param str collection_link: The name of the collection.
:param list partition_key_range: List of partition key range.
:param list partition_key_ranges: List of partition key range.
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
:return: List of overlapping partition key ranges.
:rtype: list
"""
Expand Down Expand Up @@ -127,7 +127,7 @@ def _subtract_range(r, partition_key_range):

class SmartRoutingMapProvider(PartitionKeyRangeCache):
"""
Efficiently uses PartitionKeyRangeCach and minimizes the unnecessary
Efficiently uses PartitionKeyRangeCache and minimizes the unnecessary
invocation of CollectionRoutingMap.get_overlapping_ranges()
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2445,6 +2445,10 @@ def _retrieve_partition_key(self, partition_key_parts, document, is_system_key):

return partitionKey

def refresh_routing_map_provider(self):
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)

async def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from .. import _resource_throttle_retry_policy
from .. import _default_retry_policy
from .. import _session_retry_policy
from .. import _gone_retry_policy
annatisch marked this conversation as resolved.
Show resolved Hide resolved


# pylint: disable=protected-access
Expand Down Expand Up @@ -68,6 +69,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
sessionRetry_policy = _session_retry_policy._SessionRetryPolicy(
client.connection_policy.EnableEndpointDiscovery, global_endpoint_manager, *args
)
partition_key_range_gone_retry_policy = _gone_retry_policy.PartitionKeyRangeGoneRetryPolicy(client, *args)

while True:
try:
client_timeout = kwargs.get('timeout')
Expand Down Expand Up @@ -100,6 +103,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
and e.sub_status == SubStatusCodes.READ_SESSION_NOTAVAILABLE
):
retry_policy = sessionRetry_policy
elif exceptions._partition_range_is_gone(e):
retry_policy = partition_key_range_gone_retry_policy
annatisch marked this conversation as resolved.
Show resolved Hide resolved
else:
retry_policy = defaultRetry_policy

Expand Down
Loading