-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cosmos] CosmosDB asynchronous client #21404
Merged
+9,298
−77
Merged
Changes from 39 commits
Commits
Show all changes
61 commits
Select commit
Hold shift + click to select a range
61ba8d1
initial commit
simorenoh 15dcceb
Client Constructor (#20310)
annatisch bda95c3
read database
simorenoh c9648ab
Update simon_testfile.py
simorenoh 80540dc
with coroutine
simorenoh 1285438
Update simon_testfile.py
simorenoh 992b0cd
small changes
simorenoh 47cb688
async with returns no exceptions
simorenoh f3fa79f
Merge pull request #1 from Azure/simonmoreno/async
simorenoh 0c49739
async read container
simorenoh 47f4af5
async item read
simorenoh c97c946
cleaning up
simorenoh fcd95db
create item/ database methods
simorenoh 36c5b90
item delete working
simorenoh 44db2a2
docs replace functionality
simorenoh ec5b6ed
upsert functionality
simorenoh d63d052
Merge pull request #2 from simorenoh/item-read
simorenoh 5d74c8f
missing query methods
simorenoh 89fc2f7
CRUD for udf, sproc, triggers
simorenoh fdaa880
Merge branch 'Azure:main' into async-client
simorenoh 3f9baf2
Merge branch 'Azure:main' into async-client
simorenoh d6650bc
Merge branch 'Azure:main' into query-functionality
simorenoh 043dfe0
initial query logic + container methods
simorenoh befdb41
Merge branch 'async-client' into query-functionality
simorenoh 8cffbe2
Merge pull request #3 from simorenoh/query-functionality
simorenoh 72de7c8
missing some execution logic and tests
simorenoh 5b805b8
oops
simorenoh 8d8d0c4
fully working queries
simorenoh b597ca8
small fix to query_items()
simorenoh 18319df
Update _cosmos_client_connection_async.py
simorenoh 162c44d
Update _cosmos_client_connection.py
simorenoh ebbac51
documentation update
simorenoh 43f78e6
Merge branch 'Azure:main' into main
simorenoh 470aa5b
updated MIT dates and get_user_client() description
simorenoh 74da690
Update CHANGELOG.md
simorenoh 7104d63
Merge branch 'Azure:main' into main
simorenoh 20718c7
Delete simon_testfile.py
simorenoh d825eaa
Merge pull request #4 from simorenoh/async-client
simorenoh e3c27a5
leftover retry utility
simorenoh 3b778ad
Update README.md
simorenoh c6e352e
docs and removed six package
simorenoh 8971a25
Merge remote-tracking branch 'upstream/main'
simorenoh 52736ac
changes based on comments
simorenoh ad98039
small change in type hints
simorenoh f76c595
updated readme
simorenoh 3f02a65
fixes based on conversations
simorenoh e719869
added missing type comments
simorenoh d03ee05
Merge branch 'Azure:main' into main
simorenoh 02c52ee
update changelog for ci pipeline
simorenoh 2cb4551
added typehints, moved params into keywords, added decorators, made _…
simorenoh cf20d35
changes based on sync with central sdk
simorenoh f456817
remove is_system_key from scripts (only used in execute_sproc)
simorenoh ea9bd16
Revert "remove is_system_key from scripts (only used in execute_sproc)"
simorenoh 709d2eb
async script proxy using composition
simorenoh 3277dd8
pylint
simorenoh a57cb4d
capitalized constants
simorenoh 014578b
Apply suggestions from code review
simorenoh 0d79695
closing python code snippet
simorenoh fdabea1
last doc updates
simorenoh 016d0dd
Update sdk/cosmos/azure-cosmos/CHANGELOG.md
tjprescott 8228aa9
version update
simorenoh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
## 4.2.1 (Unreleased) | ||
|
||
**New features** | ||
- Added language native async i/o client | ||
|
||
## 4.2.0 (2020-10-08) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
20 changes: 20 additions & 0 deletions
20
sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# The MIT License (MIT) | ||
# Copyright (c) 2021 Microsoft Corporation | ||
|
||
# Permission is hereby granted, free of charge, to any person obtaining a copy | ||
# of this software and associated documentation files (the "Software"), to deal | ||
# in the Software without restriction, including without limitation the rights | ||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
# copies of the Software, and to permit persons to whom the Software is | ||
# furnished to do so, subject to the following conditions: | ||
|
||
# The above copyright notice and this permission notice shall be included in all | ||
# copies or substantial portions of the Software. | ||
|
||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
# SOFTWARE. |
171 changes: 171 additions & 0 deletions
171
sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
# The MIT License (MIT) | ||
# Copyright (c) 2021 Microsoft Corporation | ||
|
||
# Permission is hereby granted, free of charge, to any person obtaining a copy | ||
# of this software and associated documentation files (the "Software"), to deal | ||
# in the Software without restriction, including without limitation the rights | ||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
# copies of the Software, and to permit persons to whom the Software is | ||
# furnished to do so, subject to the following conditions: | ||
|
||
# The above copyright notice and this permission notice shall be included in all | ||
# copies or substantial portions of the Software. | ||
|
||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
# SOFTWARE. | ||
|
||
"""Internal class for query execution context implementation in the Azure Cosmos | ||
database service. | ||
""" | ||
|
||
from collections import deque | ||
import copy | ||
|
||
from ...aio import _retry_utility_async | ||
from ... import http_constants | ||
|
||
# pylint: disable=protected-access | ||
|
||
|
||
class _QueryExecutionContextBase(object): | ||
""" | ||
This is the abstract base execution context class. | ||
""" | ||
|
||
def __init__(self, client, options): | ||
""" | ||
:param CosmosClient client: | ||
:param dict options: The request options for the request. | ||
""" | ||
self._client = client | ||
self._options = options | ||
self._is_change_feed = "changeFeed" in options and options["changeFeed"] is True | ||
self._continuation = self._get_initial_continuation() | ||
self._has_started = False | ||
self._has_finished = False | ||
self._buffer = deque() | ||
|
||
def _get_initial_continuation(self): | ||
if "continuation" in self._options: | ||
if "enableCrossPartitionQuery" in self._options: | ||
raise ValueError("continuation tokens are not supported for cross-partition queries.") | ||
return self._options["continuation"] | ||
return None | ||
|
||
def _has_more_pages(self): | ||
return not self._has_started or self._continuation | ||
|
||
async def fetch_next_block(self): | ||
"""Returns a block of results with respecting retry policy. | ||
|
||
This method only exists for backward compatibility reasons. (Because | ||
QueryIterable has exposed fetch_next_block api). | ||
|
||
:return: List of results. | ||
:rtype: list | ||
""" | ||
if not self._has_more_pages(): | ||
return [] | ||
|
||
if self._buffer: | ||
# if there is anything in the buffer returns that | ||
res = list(self._buffer) | ||
self._buffer.clear() | ||
return res | ||
|
||
# fetches the next block | ||
return await self._fetch_next_block() | ||
|
||
async def _fetch_next_block(self): | ||
raise NotImplementedError | ||
|
||
async def __aiter__(self): | ||
"""Returns itself as an iterator""" | ||
return self | ||
|
||
async def __anext__(self): | ||
"""Return the next query result. | ||
|
||
:return: The next query result. | ||
:rtype: dict | ||
:raises StopAsyncIteration: If no more result is left. | ||
""" | ||
if self._has_finished: | ||
raise StopAsyncIteration | ||
|
||
if not self._buffer: | ||
|
||
results = await self.fetch_next_block() | ||
self._buffer.extend(results) | ||
|
||
if not self._buffer: | ||
raise StopAsyncIteration | ||
|
||
return self._buffer.popleft() | ||
|
||
async def _fetch_items_helper_no_retries(self, fetch_function): | ||
"""Fetches more items and doesn't retry on failure | ||
|
||
:return: List of fetched items. | ||
:rtype: list | ||
""" | ||
fetched_items = [] | ||
# Continues pages till finds a non empty page or all results are exhausted | ||
while self._continuation or not self._has_started: | ||
if not self._has_started: | ||
self._has_started = True | ||
new_options = copy.deepcopy(self._options) | ||
new_options["continuation"] = self._continuation | ||
(fetched_items, response_headers) = await fetch_function(new_options) | ||
continuation_key = http_constants.HttpHeaders.Continuation | ||
# Use Etag as continuation token for change feed queries. | ||
if self._is_change_feed: | ||
continuation_key = http_constants.HttpHeaders.ETag | ||
# In change feed queries, the continuation token is always populated. The hasNext() test is whether | ||
# there is any items in the response or not. | ||
if not self._is_change_feed or fetched_items: | ||
self._continuation = response_headers.get(continuation_key) | ||
else: | ||
self._continuation = None | ||
if fetched_items: | ||
break | ||
return fetched_items | ||
|
||
async def _fetch_items_helper_with_retries(self, fetch_function): | ||
async def callback(): | ||
return await self._fetch_items_helper_no_retries(fetch_function) | ||
|
||
return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback) | ||
|
||
next = __anext__ # Python 2 compatibility. | ||
simorenoh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class _DefaultQueryExecutionContext(_QueryExecutionContextBase): | ||
""" | ||
This is the default execution context. | ||
""" | ||
|
||
def __init__(self, client, options, fetch_function): | ||
""" | ||
:param CosmosClient client: | ||
:param dict options: The request options for the request. | ||
:param method fetch_function: | ||
Will be invoked for retrieving each page | ||
|
||
Example of `fetch_function`: | ||
|
||
>>> def result_fn(result): | ||
>>> return result['Databases'] | ||
|
||
""" | ||
super(_DefaultQueryExecutionContext, self).__init__(client, options) | ||
self._fetch_function = fetch_function | ||
|
||
async def _fetch_next_block(self): | ||
while super(_DefaultQueryExecutionContext, self)._has_more_pages() and not self._buffer: | ||
return await self._fetch_items_helper_with_retries(self._fetch_function) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we are forking the implementation code and copying from the non async io code path, we need to have a plan on maintenance.
now if there is any bug let's say in the query stack we need to fix this into different codebase.
multiple options to consider for future:
@kushagraThapar