-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cosmos] CosmosDB asynchronous client #21404
Changes from all commits
61ba8d1
15dcceb
bda95c3
c9648ab
80540dc
1285438
992b0cd
47cb688
f3fa79f
0c49739
47f4af5
c97c946
fcd95db
36c5b90
44db2a2
ec5b6ed
d63d052
5d74c8f
89fc2f7
fdaa880
3f9baf2
d6650bc
043dfe0
befdb41
8cffbe2
72de7c8
5b805b8
8d8d0c4
b597ca8
18319df
162c44d
ebbac51
43f78e6
470aa5b
74da690
7104d63
20718c7
d825eaa
e3c27a5
3b778ad
c6e352e
8971a25
52736ac
ad98039
f76c595
3f02a65
e719869
d03ee05
02c52ee
2cb4551
cf20d35
f456817
ea9bd16
709d2eb
3277dd8
a57cb4d
014578b
0d79695
fdabea1
016d0dd
8228aa9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -68,9 +68,9 @@ Once you've populated the `ACCOUNT_URI` and `ACCOUNT_KEY` environment variables, | |
from azure.cosmos import CosmosClient | ||
|
||
import os | ||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
``` | ||
|
||
## Key concepts | ||
|
@@ -90,14 +90,17 @@ For more information about these resources, see [Working with Azure Cosmos datab | |
|
||
The keyword-argument `enable_cross_partition_query` accepts 2 options: `None` (default) or `True`. | ||
|
||
## Note on using queries by id | ||
|
||
When using queries that try to find items based on an **id** value, always make sure you are passing in a string type variable. Azure Cosmos DB only allows string id values and if you use any other datatype, this SDK will return no results and no error messages. | ||
|
||
## Limitations | ||
|
||
Currently the features below are **not supported**. For alternatives options, check the **Workarounds** section below. | ||
|
||
### Data Plane Limitations: | ||
|
||
* Group By queries | ||
* Language Native async i/o | ||
* Queries with COUNT from a DISTINCT subquery: SELECT COUNT (1) FROM (SELECT DISTINCT C.ID FROM C) | ||
* Bulk/Transactional batch processing | ||
* Direct TCP Mode access | ||
|
@@ -177,6 +180,7 @@ The following sections provide several code snippets covering some of the most c | |
* [Get database properties](#get-database-properties "Get database properties") | ||
* [Get database and container throughputs](#get-database-and-container-throughputs "Get database and container throughputs") | ||
* [Modify container properties](#modify-container-properties "Modify container properties") | ||
* [Using the asynchronous client](#using-the-asynchronous-client "Using the asynchronous client") | ||
|
||
### Create a database | ||
|
||
|
@@ -186,14 +190,14 @@ After authenticating your [CosmosClient][ref_cosmosclient], you can work with an | |
from azure.cosmos import CosmosClient, exceptions | ||
import os | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
try: | ||
database = client.create_database(database_name) | ||
database = client.create_database(DATABASE_NAME) | ||
except exceptions.CosmosResourceExistsError: | ||
database = client.get_database_client(database_name) | ||
database = client.get_database_client(DATABASE_NAME) | ||
``` | ||
|
||
### Create a container | ||
|
@@ -204,17 +208,17 @@ This example creates a container with default settings. If a container with the | |
from azure.cosmos import CosmosClient, PartitionKey, exceptions | ||
import os | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
container_name = 'products' | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
|
||
try: | ||
container = database.create_container(id=container_name, partition_key=PartitionKey(path="/productName")) | ||
container = database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName")) | ||
except exceptions.CosmosResourceExistsError: | ||
container = database.get_container_client(container_name) | ||
container = database.get_container_client(CONTAINER_NAME) | ||
except exceptions.CosmosHttpResponseError: | ||
raise | ||
``` | ||
|
@@ -231,11 +235,11 @@ The options for analytical_storage_ttl are: | |
|
||
|
||
```Python | ||
container_name = 'products' | ||
CONTAINER_NAME = 'products' | ||
try: | ||
container = database.create_container(id=container_name, partition_key=PartitionKey(path="/productName"),analytical_storage_ttl=-1) | ||
container = database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName"),analytical_storage_ttl=-1) | ||
except exceptions.CosmosResourceExistsError: | ||
container = database.get_container_client(container_name) | ||
container = database.get_container_client(CONTAINER_NAME) | ||
except exceptions.CosmosHttpResponseError: | ||
raise | ||
``` | ||
|
@@ -250,13 +254,13 @@ Retrieve an existing container from the database: | |
from azure.cosmos import CosmosClient | ||
import os | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
container_name = 'products' | ||
container = database.get_container_client(container_name) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
``` | ||
|
||
### Insert data | ||
|
@@ -269,13 +273,13 @@ This example inserts several items into the container, each with a unique `id`: | |
from azure.cosmos import CosmosClient | ||
import os | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
container_name = 'products' | ||
container = database.get_container_client(container_name) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
|
||
for i in range(1, 10): | ||
container.upsert_item({ | ||
|
@@ -294,13 +298,13 @@ To delete items from a container, use [ContainerProxy.delete_item][ref_container | |
from azure.cosmos import CosmosClient | ||
import os | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
container_name = 'products' | ||
container = database.get_container_client(container_name) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
|
||
for item in container.query_items( | ||
query='SELECT * FROM products p WHERE p.productModel = "Model 2"', | ||
|
@@ -320,13 +324,13 @@ This example queries a container for items with a specific `id`: | |
from azure.cosmos import CosmosClient | ||
import os | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
container_name = 'products' | ||
container = database.get_container_client(container_name) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
|
||
# Enumerate the returned items | ||
import json | ||
|
@@ -363,11 +367,11 @@ from azure.cosmos import CosmosClient | |
import os | ||
import json | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
properties = database.read() | ||
print(json.dumps(properties)) | ||
``` | ||
|
@@ -381,19 +385,19 @@ from azure.cosmos import CosmosClient | |
import os | ||
import json | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
|
||
# Database | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
db_offer = database.read_offer() | ||
print('Found Offer \'{0}\' for Database \'{1}\' and its throughput is \'{2}\''.format(db_offer.properties['id'], database.id, db_offer.properties['content']['offerThroughput'])) | ||
|
||
# Container with dedicated throughput only. Will return error "offer not found" for containers without dedicated throughput | ||
container_name = 'testContainer' | ||
container = database.get_container_client(container_name) | ||
CONTAINER_NAME = 'testContainer' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
container_offer = container.read_offer() | ||
print('Found Offer \'{0}\' for Container \'{1}\' and its throughput is \'{2}\''.format(container_offer.properties['id'], container.id, container_offer.properties['content']['offerThroughput'])) | ||
``` | ||
|
@@ -408,13 +412,13 @@ from azure.cosmos import CosmosClient, PartitionKey | |
import os | ||
import json | ||
|
||
url = os.environ['ACCOUNT_URI'] | ||
key = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(url, credential=key) | ||
database_name = 'testDatabase' | ||
database = client.get_database_client(database_name) | ||
container_name = 'products' | ||
container = database.get_container_client(container_name) | ||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
|
||
database.replace_container( | ||
container, | ||
|
@@ -428,7 +432,90 @@ print(json.dumps(container_props['defaultTtl'])) | |
|
||
For more information on TTL, see [Time to Live for Azure Cosmos DB data][cosmos_ttl]. | ||
|
||
### Using the asynchronous client | ||
|
||
The asynchronous cosmos client is a separate client that looks and works in a similar fashion to the existing synchronous client. However, the async client needs to be imported separately and its methods need to be used with the async/await keywords. | ||
|
||
```Python | ||
from azure.cosmos.aio import CosmosClient | ||
import os | ||
|
||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
|
||
async def create_items(): | ||
for i in range(10): | ||
await container.upsert_item({ | ||
'id': 'item{0}'.format(i), | ||
'productName': 'Widget', | ||
'productModel': 'Model {0}'.format(i) | ||
} | ||
) | ||
await client.close() # the async client must be closed manually if it's not initialized in a with statement | ||
``` | ||
|
||
It is also worth pointing out that the asynchronous client has to be closed manually after its use, either by initializing it using async with or calling the close() method directly like shown above. | ||
|
||
```Python | ||
from azure.cosmos.aio import CosmosClient | ||
import os | ||
|
||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
DATABASE_NAME = 'testDatabase' | ||
CONTAINER_NAME = 'products' | ||
|
||
async with CosmosClient(URL, credential=KEY) as client: # the with statement will automatically close the async client | ||
database = client.get_database_client(DATABASE_NAME) | ||
container = database.get_container_client(CONTAINER_NAME) | ||
for i in range(10): | ||
await container.upsert_item({ | ||
'id': 'item{0}'.format(i), | ||
'productName': 'Widget', | ||
'productModel': 'Model {0}'.format(i) | ||
} | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to close the client here too? await client.close() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is that second way to initialize I had mentioned above - the way it is done here, it gets disposed once the |
||
``` | ||
|
||
### Queries with the asynchronous client | ||
|
||
Unlike the synchronous client, the async client does not have an `enable_cross_partition` flag in the request. Queries without a specified partition key value will attempt to do a cross partition query by default. | ||
|
||
Query results can be iterated, but the query's raw output returns an asynchronous iterator. This means that each object from the iterator is an awaitable object, and does not yet contain the true query result. In order to obtain the query results you can use an async for loop, which awaits each result as you iterate on the object, or manually await each query result as you iterate over the asynchronous iterator. | ||
|
||
Since the query results are an asynchronous iterator, they can't be cast into lists directly; instead, if you need to create lists from your results, use an async for loop or Python's list comprehension to populate a list: | ||
|
||
```Python | ||
from azure.cosmos.aio import CosmosClient | ||
import os | ||
|
||
URL = os.environ['ACCOUNT_URI'] | ||
KEY = os.environ['ACCOUNT_KEY'] | ||
client = CosmosClient(URL, credential=KEY) | ||
DATABASE_NAME = 'testDatabase' | ||
database = client.get_database_client(DATABASE_NAME) | ||
CONTAINER_NAME = 'products' | ||
container = database.get_container_client(CONTAINER_NAME) | ||
|
||
async def create_lists(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to close the client in this example too? I'm not seeing "with" being used either. |
||
results = container.query_items( | ||
query='SELECT * FROM products p WHERE p.productModel = "Model 2"') | ||
|
||
# iterates on "results" iterator to asynchronously create a complete list of the actual query results | ||
|
||
item_list = [] | ||
async for item in results: | ||
item_list.append(item) | ||
|
||
# Asynchronously creates a complete list of the actual query results. This code performs the same action as the for-loop example above. | ||
item_list = [item async for item in results] | ||
await client.close() | ||
``` | ||
simorenoh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
## Troubleshooting | ||
|
||
### General | ||
|
@@ -441,7 +528,7 @@ For example, if you try to create a container using an ID (name) that's already | |
|
||
```Python | ||
try: | ||
database.create_container(id=container_name, partition_key=PartitionKey(path="/productName")) | ||
database.create_container(id=CONTAINER_NAME, partition_key=PartitionKey(path="/productName")) | ||
except exceptions.CosmosResourceExistsError: | ||
print("""Error creating container | ||
HTTP status code 409: The ID (name) provided for the container is already in use. | ||
|
@@ -471,13 +558,13 @@ handler = logging.StreamHandler(stream=sys.stdout) | |
logger.addHandler(handler) | ||
|
||
# This client will log detailed information about its HTTP sessions, at DEBUG level | ||
client = CosmosClient(url, credential=key, logging_enable=True) | ||
client = CosmosClient(URL, credential=KEY, logging_enable=True) | ||
``` | ||
|
||
Similarly, `logging_enable` can enable detailed logging for a single operation, | ||
even when it isn't enabled for the client: | ||
```py | ||
database = client.create_database(database_name, logging_enable=True) | ||
database = client.create_database(DATABASE_NAME, logging_enable=True) | ||
``` | ||
|
||
## Next steps | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# The MIT License (MIT) | ||
# Copyright (c) 2021 Microsoft Corporation | ||
|
||
# Permission is hereby granted, free of charge, to any person obtaining a copy | ||
# of this software and associated documentation files (the "Software"), to deal | ||
# in the Software without restriction, including without limitation the rights | ||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
# copies of the Software, and to permit persons to whom the Software is | ||
# furnished to do so, subject to the following conditions: | ||
|
||
# The above copyright notice and this permission notice shall be included in all | ||
# copies or substantial portions of the Software. | ||
|
||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
# SOFTWARE. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simorenoh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be passed around safely, but needs to be manually disposed of once you're done using it - the alternative to this being to use the client using a
with
statement:async with CosmosClient(url, key) as client:
and then starting your cosmos db logic within that contextThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simorenoh What happens if these async clients are not closed properly? Memory leak or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have to test, but I assume that is the case - you also get an error in your code stating the client was not properly disposed of