Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Change default consistency level to the database account consistency level #22792

Merged
merged 24 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
51664b6
consistency level gets set to default found in database account
simorenoh Jan 27, 2022
edb78a4
async client default change
simorenoh Jan 27, 2022
fd98dae
updated docs based on finding and updated samples to reflect best pra…
simorenoh Jan 27, 2022
df02ee4
Update CHANGELOG.md
simorenoh Jan 27, 2022
dcac0b2
Update README.md
simorenoh Jan 27, 2022
68d9a97
Update README.md
simorenoh Jan 27, 2022
8b006a6
Update README.md
simorenoh Jan 27, 2022
36cf3be
Update CHANGELOG.md
simorenoh Jan 27, 2022
6f7151e
formatting
simorenoh Jan 27, 2022
69df86d
formatting
simorenoh Jan 27, 2022
5b4f406
updated consistency for first request to Eventual (lowest latency)
simorenoh Jan 27, 2022
c19f655
pylint
simorenoh Jan 27, 2022
ce12951
from_connection_string methods
simorenoh Jan 27, 2022
84987ce
from_connection_string2
simorenoh Jan 27, 2022
2d56ebd
Update sdk/cosmos/azure-cosmos/README.md
simorenoh Jan 28, 2022
f018c89
Apply suggestions from code review
simorenoh Jan 28, 2022
2732fa1
Update README.md
simorenoh Jan 31, 2022
7116654
removed forceful header usage, changed setup to only check for Sessio…
simorenoh Feb 2, 2022
c4d688b
need to set header if Session consistency for updating session if ne…
simorenoh Feb 2, 2022
c126d33
Apply suggestions from code review
simorenoh Feb 2, 2022
00fcfd2
added test for session token
simorenoh Feb 2, 2022
9a80b00
Merge branch 'integrated-cache' of https://github.com/simorenoh/azure…
simorenoh Feb 2, 2022
7c098ea
Update CHANGELOG.md
simorenoh Feb 2, 2022
ae64f96
Update _cosmos_client_connection_async.py
simorenoh Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## Release History

### 4.3.0b3 (Unreleased)

### Bugs fixed
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the
consistency level of the user's cosmos account setting on initialization
simorenoh marked this conversation as resolved.
Show resolved Hide resolved

### 4.3.0b2 (2022-01-25)

This version and all future versions will require Python 3.6+. Python 2.7 is no longer supported.
Expand Down
35 changes: 19 additions & 16 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,20 +439,22 @@ For more information on TTL, see [Time to Live for Azure Cosmos DB data][cosmos_
### Using the asynchronous client (Preview)

The asynchronous cosmos client is a separate client that looks and works in a similar fashion to the existing synchronous client. However, the async client needs to be imported separately and its methods need to be used with the async/await keywords.
The Async client needs to be initialized and closed after usage. The example below shows how to do so by using the client's __aenter__() and close() methods.

```Python
from azure.cosmos.aio import CosmosClient
import os

URL = os.environ['ACCOUNT_URI']
KEY = os.environ['ACCOUNT_KEY']
client = CosmosClient(URL, credential=KEY)
DATABASE_NAME = 'testDatabase'
database = client.get_database_client(DATABASE_NAME)
CONTAINER_NAME = 'products'
container = database.get_container_client(CONTAINER_NAME)
CONTAINER_NAME = 'products'

async def create_items():
async def create_products():
client = CosmosClient(URL, credential=KEY)
await client.__aenter__()
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
for i in range(10):
await container.upsert_item({
'id': 'item{0}'.format(i),
Expand All @@ -463,7 +465,7 @@ async def create_items():
await client.close() # the async client must be closed manually if it's not initialized in a with statement
```

It is also worth pointing out that the asynchronous client has to be closed manually after its use, either by initializing it using async with or calling the close() method directly like shown above.
Instead of manually opening and closing the client, you can and should use the `async with` keywords. This creates a context manager that will initialize and later close the client once you're out of the statement. The example below shows how to do so.
simorenoh marked this conversation as resolved.
Show resolved Hide resolved

```Python
from azure.cosmos.aio import CosmosClient
Expand All @@ -474,16 +476,17 @@ KEY = os.environ['ACCOUNT_KEY']
DATABASE_NAME = 'testDatabase'
CONTAINER_NAME = 'products'

async with CosmosClient(URL, credential=KEY) as client: # the with statement will automatically close the async client
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
for i in range(10):
await container.upsert_item({
'id': 'item{0}'.format(i),
'productName': 'Widget',
'productModel': 'Model {0}'.format(i)
}
)
async def create_products():
async with CosmosClient(URL, credential=KEY) as client: # the with statement will automatically initialize and close the async client
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
for i in range(10):
await container.upsert_item({
'id': 'item{0}'.format(i),
'productName': 'Widget',
'productModel': 'Model {0}'.format(i)
}
)
```

### Queries with the asynchronous client (Preview)
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class _Constants(object):
"""Constants used in the azure-cosmos package"""

UserConsistencyPolicy = "userConsistencyPolicy"
DefaultConsistencyLevel = "defaultConsistencyLevel"

# GlobalDB related constants
WritableLocations = "writableLocations"
Expand Down
50 changes: 36 additions & 14 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"""
# https://github.com/PyCQA/pylint/issues/3112
# Currently pylint is locked to 2.3.3 and this is fixed in 2.4.4
from typing import Dict, Any, Optional # pylint: disable=unused-import
from typing import Dict, Any, Optional, TypeVar # pylint: disable=unused-import
import urllib.parse
from urllib3.util.retry import Retry
from azure.core.paging import ItemPaged # type: ignore
Expand Down Expand Up @@ -59,6 +59,8 @@
from . import _utils
from .partition_key import _Undefined, _Empty

ClassType = TypeVar("ClassType")


# pylint: disable=protected-access

Expand Down Expand Up @@ -92,7 +94,7 @@ def __init__(
url_connection, # type: str
auth, # type: Dict[str, Any]
connection_policy=None, # type: Optional[ConnectionPolicy]
consistency_level=documents.ConsistencyLevel.Session, # type: str
consistency_level=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> None
Expand Down Expand Up @@ -139,20 +141,9 @@ def __init__(
http_constants.HttpHeaders.IsContinuationExpected: False,
}

if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

# Keeps the latest response headers from server.
self.last_response_headers = None

if consistency_level == documents.ConsistencyLevel.Session:
# create a session - this is maintained only if the default consistency level
# on the client is set to session, or if the user explicitly sets it as a property
# via setter
self.session = _session.Session(self.url_connection)
else:
self.session = None # type: ignore

self._useMultipleWriteLocations = False
self._global_endpoint_manager = global_endpoint_manager._GlobalEndpointManager(self)

Expand Down Expand Up @@ -210,6 +201,38 @@ def __init__(
database_account = self._global_endpoint_manager._GetDatabaseAccount(**kwargs)
self._global_endpoint_manager.force_refresh(database_account)

# Use database_account if no consistency passed in to verify consistency level to be used
self._set_client_consistency_level(database_account, consistency_level)

def _set_client_consistency_level(
self,
database_account: ClassType,
consistency_level: Optional[str],
) -> None:
"""Checks if consistency level param was passed in by user and sets it to that value or to the account default.

:param database_account: The database account to be used to check consistency levels
:type database_account: ~azure.cosmos.documents.DatabaseAccount
:param consistency_level: The consistency level passed in by the user
:type consistency_level: Optional[str]
:rtype: None
"""
if consistency_level is None:
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
# Set to default level present in account
user_consistency_policy = database_account.ConsistencyPolicy
consistency_level = user_consistency_policy.get(constants._Constants.DefaultConsistencyLevel)
else:
# Set consistency level header to be used for the client
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

if consistency_level == documents.ConsistencyLevel.Session:
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
# create a session - this is maintained only if the default consistency level
# on the client is set to session, or if the user explicitly sets it as a property
# via setter
self.session = _session.Session(self.url_connection)
else:
self.session = None # type: ignore

@property
def Session(self):
"""Gets the session object from the client. """
Expand Down Expand Up @@ -2560,7 +2583,6 @@ def refresh_routing_map_provider(self):
# re-initializes the routing map provider, effectively refreshing the current partition key range cache
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)


def _UpdateSessionIfRequired(self, request_headers, response_result, response_headers):
"""
Updates session if necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"""
# https://github.com/PyCQA/pylint/issues/3112
# Currently pylint is locked to 2.3.3 and this is fixed in 2.4.4
from typing import Dict, Any, Optional # pylint: disable=unused-import
from typing import Dict, Any, Optional, TypeVar # pylint: disable=unused-import
from urllib.parse import urlparse
from urllib3.util.retry import Retry
from azure.core.async_paging import AsyncItemPaged
Expand Down Expand Up @@ -59,6 +59,7 @@
from .. import _utils
from ..partition_key import _Undefined, _Empty

ClassType = TypeVar("ClassType")
# pylint: disable=protected-access

class CosmosClientConnection(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes
Expand Down Expand Up @@ -90,7 +91,7 @@ def __init__(
url_connection, # type: str
auth, # type: Dict[str, Any]
connection_policy=None, # type: Optional[ConnectionPolicy]
consistency_level=documents.ConsistencyLevel.Session, # type: str
consistency_level=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> None
Expand Down Expand Up @@ -143,14 +144,6 @@ def __init__(
# Keeps the latest response headers from server.
self.last_response_headers = None

if consistency_level == documents.ConsistencyLevel.Session:
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
# create a session - this is maintained only if the default consistency level
# on the client is set to session, or if the user explicitly sets it as a property
# via setter
self.session = _session.Session(self.url_connection)
else:
self.session = None # type: ignore

self._useMultipleWriteLocations = False
self._global_endpoint_manager = global_endpoint_manager_async._GlobalEndpointManager(self)

Expand Down Expand Up @@ -232,10 +225,52 @@ def _ReadEndpoint(self):
return self._global_endpoint_manager.get_read_endpoint()

async def _setup(self):
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
# Save the choice that was made (either None or some value) and branch to set or get the consistency
if self.default_headers.get(http_constants.HttpHeaders.ConsistencyLevel):
user_defined_consistency = self.default_headers[http_constants.HttpHeaders.ConsistencyLevel]
else:
user_defined_consistency = None

if user_defined_consistency == documents.ConsistencyLevel.Session:
# create a Session if the user wants Session consistency
self.session = _session.Session(self.url_connection)
else:
self.session = None # type: ignore

database_account = None
if 'database_account' not in self._setup_kwargs:
self._setup_kwargs['database_account'] = await self._global_endpoint_manager._GetDatabaseAccount(
database_account = await self._global_endpoint_manager._GetDatabaseAccount(
**self._setup_kwargs)
self._setup_kwargs['database_account'] = database_account
await self._global_endpoint_manager.force_refresh(self._setup_kwargs['database_account'])
else:
database_account = self._setup_kwargs.get('database_account')

# Use database_account if no consistency passed in to verify consistency level to be used
if user_defined_consistency is None:
self._check_if_session_consistency(database_account)
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved

def _check_if_session_consistency(
self,
database_account: ClassType,
) -> None:
"""Checks account consistency level to set client Session if needed.
:param database_account: The database account to be used to check consistency levels
:type database_account: ~azure.cosmos.documents.DatabaseAccount
:rtype: None
"""
# Set to default level present in account
user_consistency_policy = database_account.ConsistencyPolicy
consistency_level = user_consistency_policy.get(constants._Constants.DefaultConsistencyLevel)

if consistency_level == documents.ConsistencyLevel.Session:
# create a session - this is maintained only if the default consistency level
# on the client is set to session, or if the user explicitly sets it as a property
# via setter
self.session = _session.Session(self.url_connection)
else:
self.session = None # type: ignore


def _GetDatabaseIdWithPathForUser(self, database_link, user): # pylint: disable=no-self-use
CosmosClientConnection.__ValidateResource(user)
Expand Down
15 changes: 7 additions & 8 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class CosmosClient(object):
:param str url: The URL of the Cosmos DB account.
:param credential: Can be the account key, or a dictionary of resource tokens.
:type credential: str or dict[str, str]
:keyword str consistency_level: Consistency level to use for the session. The default value is "Session".
:param str consistency_level: Consistency level to use for the session. The default value is None (Account level).

.. admonition:: Example:

Expand All @@ -109,11 +109,10 @@ class CosmosClient(object):
:name: create_client
"""

def __init__(self, url, credential, **kwargs):
# type: (str, Any, str, Any) -> None
def __init__(self, url, credential, consistency_level=None, **kwargs):
# type: (str, Any, Optional[str], Any) -> None
"""Instantiate a new CosmosClient."""
auth = _build_auth(credential)
consistency_level = kwargs.get('consistency_level', 'Session')
connection_policy = _build_connection_policy(kwargs)
self.client_connection = CosmosClientConnection(
url,
Expand Down Expand Up @@ -141,8 +140,8 @@ async def close(self):
await self.__aexit__()

@classmethod
def from_connection_string(cls, conn_str, credential=None, consistency_level="Session", **kwargs):
# type: (str, Optional[Union[str, Dict[str, str]]], str, Any) -> CosmosClient
def from_connection_string(cls, conn_str, credential=None, consistency_level=None, **kwargs):
# type: (str, Optional[Union[str, Dict[str, str]]], Optional[str], Any) -> CosmosClient
"""Create a CosmosClient instance from a connection string.

This can be retrieved from the Azure portal.For full list of optional
Expand All @@ -155,8 +154,8 @@ def from_connection_string(cls, conn_str, credential=None, consistency_level="Se
:type credential: str or Dict[str, str]
:param conn_str: The connection string.
:type conn_str: str
:param consistency_level: Consistency level to use for the session. The default value is "Session".
:type consistency_level: str
:param consistency_level: Consistency level to use for the session. The default value is None (Account level).
:type consistency_level: Optional[str]
"""
settings = _parse_connection_str(conn_str, credential)
return cls(
Expand Down
14 changes: 7 additions & 7 deletions sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class CosmosClient(object):
:param str url: The URL of the Cosmos DB account.
:param credential: Can be the account key, or a dictionary of resource tokens.
:type credential: str or dict[str, str]
:param str consistency_level: Consistency level to use for the session. The default value is "Session".
:param str consistency_level: Consistency level to use for the session. The default value is None (Account level).
:keyword int timeout: An absolute timeout in seconds, for the combined HTTP request and response processing.
:keyword int request_timeout: The HTTP request timeout in milliseconds.
:keyword str connection_mode: The connection mode for the client - currently only supports 'Gateway'.
Expand Down Expand Up @@ -159,8 +159,8 @@ class CosmosClient(object):
:name: create_client
"""

def __init__(self, url, credential, consistency_level="Session", **kwargs):
# type: (str, Any, str, Any) -> None
def __init__(self, url, credential, consistency_level=None, **kwargs):
# type: (str, Any, Optional[str], Any) -> None
"""Instantiate a new CosmosClient."""
auth = _build_auth(credential)
connection_policy = _build_connection_policy(kwargs)
Expand All @@ -180,8 +180,8 @@ def __exit__(self, *args):
return self.client_connection.pipeline_client.__exit__(*args)

@classmethod
def from_connection_string(cls, conn_str, credential=None, consistency_level="Session", **kwargs):
# type: (str, Optional[Any], str, Any) -> CosmosClient
def from_connection_string(cls, conn_str, credential=None, consistency_level=None, **kwargs):
# type: (str, Optional[Any], Optional[str], Any) -> CosmosClient
"""Create a CosmosClient instance from a connection string.

This can be retrieved from the Azure portal.For full list of optional
Expand All @@ -191,8 +191,8 @@ def from_connection_string(cls, conn_str, credential=None, consistency_level="Se
:param credential: Alternative credentials to use instead of the key
provided in the connection string.
:type credential: str or dict(str, str)
:param str consistency_level:
Consistency level to use for the session. The default value is "Session".
:param Optional[str] consistency_level:
Consistency level to use for the session. The default value is None (Account level).
"""
settings = _parse_connection_str(conn_str, credential)
return cls(
Expand Down
Loading