Skip to content

Commit

Permalink
[Cosmos] Add max_integrated_cache_staleness param option to item meth…
Browse files Browse the repository at this point in the history
…ods (Azure#22946)

* consistency level gets set to default found in database account

* async client default change

* updated docs based on finding and updated samples to reflect best practices

* Update CHANGELOG.md

* Update README.md

* Update README.md

* Update README.md

* Update CHANGELOG.md

* formatting

* formatting

* updated consistency for first request to Eventual (lowest latency)

* pylint

* from_connection_string methods

* from_connection_string2

* Update sdk/cosmos/azure-cosmos/README.md

Co-authored-by: Gahl Levy <[email protected]>

* Apply suggestions from code review

Co-authored-by: Gahl Levy <[email protected]>

* Update README.md

* removed forceful header usage, changed setup to only check for Session consistency to start client session

* need to set header if Session consistency  for updating session if needed (thanks Jake!)

* Apply suggestions from code review

Kushagra improved documentation and comments

Co-authored-by: Kushagra Thapar <[email protected]>

* added test for session token

* Update CHANGELOG.md

* Update _cosmos_client_connection_async.py

* added max_integrated_cache_staleness to item methods in containers

* added validation and provisional comments

* pylint

* only applied to read-only operations

* Update container.py

* Update CHANGELOG.md

* Apply suggestions from code review

Co-authored-by: Kushagra Thapar <[email protected]>

* Update _base.py

* updated param comments to mention integrated cache configuration

* moved to kwargs

* added tests to verify functionality

* Update test_integrated_cache.py

* Update test_integrated_cache.py

* updates to test to ensure it works with setup

* added headers test and new way to track client headers before sending out

these changes will also likely be used for creating the diagnostics later on

* Update test_integrated_cache.py

* Create test_axq.py

* Added mocking tests for max integrated cache staleness. Fixed issue with int value being false

* upgrade version for release

Co-authored-by: Gahl Levy <[email protected]>
Co-authored-by: Kushagra Thapar <[email protected]>
Co-authored-by: Kushagra Thapar <[email protected]>
  • Loading branch information
4 people authored and rakshith91 committed Apr 7, 2022
1 parent e259121 commit fa6a612
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 89 deletions.
14 changes: 9 additions & 5 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
> for more details on consistency levels, or the README section on this change [here](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos#note-on-client-consistency-levels).
#### Features Added
- Added support for split-proof queries for the async client.
- Added new **provisional** `max_integrated_cache_staleness_in_ms` parameter to read item and query items APIs in order
to make use of the **preview** CosmosDB integrated cache functionality.
Please see [Azure Cosmos DB integrated cache](https://docs.microsoft.com/azure/cosmos-db/integrated-cache) for more details.
- Added support for split-proof queries for the async client

### Bugs fixed
- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`.
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
- Default consistency level for the sync and async clients is no longer `Session` and will instead be set to the
consistency level of the user's cosmos account setting on initialization if not passed during client initialization.
This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency
will need to pass it explicitly if their account consistency is different than `Session`.
Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details.
- Fixed invalid request body being sent when passing in `serverScript` body parameter to replace operations for trigger, sproc and udf resources.
- Moved `is_system_key` logic in async client.
- Fixed TypeErrors not being thrown when passing in invalid connection retry policies to the client.
Expand Down
28 changes: 19 additions & 9 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
'query_version': 'queryVersion'
}


def _get_match_headers(kwargs):
# type: (Dict[str, Any]) -> Tuple(Optional[str], Optional[str])
if_match = kwargs.pop('if_match', None)
Expand Down Expand Up @@ -112,14 +113,14 @@ def build_options(kwargs):


def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
cosmos_client_connection,
default_headers,
verb,
path,
resource_id,
resource_type,
options,
partition_key_range_id=None,
cosmos_client_connection,
default_headers,
verb,
path,
resource_id,
resource_type,
options,
partition_key_range_id=None,
):
"""Gets HTTP request headers.
Expand Down Expand Up @@ -292,6 +293,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("populateQuotaInfo"):
headers[http_constants.HttpHeaders.PopulateQuotaInfo] = options["populateQuotaInfo"]

if options.get("maxIntegratedCacheStaleness"):
headers[http_constants.HttpHeaders.DedicatedGatewayCacheStaleness] = options["maxIntegratedCacheStaleness"]

return headers


Expand Down Expand Up @@ -638,7 +642,7 @@ def ParsePaths(paths):
newIndex += 1

# This will extract the token excluding the quote chars
token = path[currentIndex + 1 : newIndex]
token = path[currentIndex + 1: newIndex]
tokens.append(token)
currentIndex = newIndex + 1
else:
Expand All @@ -657,3 +661,9 @@ def ParsePaths(paths):
tokens.append(token)

return tokens


def validate_cache_staleness_value(max_integrated_cache_staleness):
int(max_integrated_cache_staleness) # Will throw error if data type cant be converted to int
if max_integrated_cache_staleness <= 0:
raise ValueError("Parameter 'max_integrated_cache_staleness_in_ms' can only be a positive integer.")
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __init__(
http_constants.HttpHeaders.IsContinuationExpected: False,
}

# Keeps the latest response headers from server.
# Keeps the latest response headers from the server.
self.last_response_headers = None

self._useMultipleWriteLocations = False
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

VERSION = "4.3.0b2"
VERSION = "4.3.0b3"
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def __init__(
if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

# Keeps the latest response headers from server.
# Keeps the latest response headers from the server.
self.last_response_headers = None

self._useMultipleWriteLocations = False
Expand Down
148 changes: 88 additions & 60 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from azure.core.tracing.decorator_async import distributed_trace_async # type: ignore

from ._cosmos_client_connection_async import CosmosClientConnection
from .._base import build_options as _build_options
from .._base import build_options as _build_options, validate_cache_staleness_value
from ..exceptions import CosmosResourceNotFoundError
from ..http_constants import StatusCodes
from ..offer import Offer
Expand All @@ -38,6 +38,7 @@

__all__ = ("ContainerProxy",)


# pylint: disable=protected-access
# pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs

Expand Down Expand Up @@ -112,10 +113,10 @@ async def _set_partition_key(self, partition_key):

@distributed_trace_async
async def read(
self,
populate_partition_key_range_statistics=None, # type: Optional[bool]
populate_quota_info=None, # type: Optional[bool]
**kwargs # type: Any
self,
populate_partition_key_range_statistics=None, # type: Optional[bool]
populate_quota_info=None, # type: Optional[bool]
**kwargs # type: Any
):
# type: (...) -> Dict[str, Any]
"""Read the container properties.
Expand Down Expand Up @@ -150,9 +151,9 @@ async def read(

@distributed_trace_async
async def create_item(
self,
body, # type: Dict[str, Any]
**kwargs # type: Any
self,
body, # type: Dict[str, Any]
**kwargs # type: Any
):
# type: (...) -> Dict[str, Any]
"""Create an item in the container.
Expand Down Expand Up @@ -198,10 +199,10 @@ async def create_item(

@distributed_trace_async
async def read_item(
self,
item, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
**kwargs # type: Any
self,
item, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
**kwargs # type: Any
):
# type: (...) -> Dict[str, Any]
"""Get the item identified by `item`.
Expand All @@ -211,6 +212,11 @@ async def read_item(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request.
:keyword Callable response_hook: A callable invoked with the response metadata.
**Provisional** keyword argument max_integrated_cache_staleness_in_ms
:keyword int max_integrated_cache_staleness_in_ms:
The max cache staleness for the integrated cache in milliseconds.
For accounts configured to use the integrated cache, using Session or Eventual consistency,
responses are guaranteed to be no staler than this value.
:returns: Dict representing the item to be retrieved.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved.
:rtype: dict[str, Any]
Expand All @@ -230,6 +236,10 @@ async def read_item(
response_hook = kwargs.pop('response_hook', None)
if partition_key is not None:
request_options["partitionKey"] = await self._set_partition_key(partition_key)
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
if max_integrated_cache_staleness_in_ms is not None:
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms

result = await self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs)
if response_hook:
Expand All @@ -238,9 +248,9 @@ async def read_item(

@distributed_trace
def read_all_items(
self,
max_item_count=None, # type: Optional[int]
**kwargs # type: Any
self,
max_item_count=None, # type: Optional[int]
**kwargs # type: Any
):
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
"""List all the items in the container.
Expand All @@ -249,13 +259,22 @@ def read_all_items(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request.
:keyword Callable response_hook: A callable invoked with the response metadata.
**Provisional** keyword argument max_integrated_cache_staleness_in_ms
:keyword int max_integrated_cache_staleness_in_ms:
The max cache staleness for the integrated cache in milliseconds.
For accounts configured to use the integrated cache, using Session or Eventual consistency,
responses are guaranteed to be no staler than this value.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
"""
feed_options = _build_options(kwargs)
response_hook = kwargs.pop('response_hook', None)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
if max_integrated_cache_staleness_in_ms:
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms

if hasattr(response_hook, "clear"):
response_hook.clear()
Expand All @@ -269,14 +288,14 @@ def read_all_items(

@distributed_trace
def query_items(
self,
query, # type: str
parameters=None, # type: Optional[List[Dict[str, Any]]]
partition_key=None, # type: Optional[Any]
max_item_count=None, # type: Optional[int]
enable_scan_in_query=None, # type: Optional[bool]
populate_query_metrics=None, # type: Optional[bool]
**kwargs # type: Any
self,
query, # type: str
parameters=None, # type: Optional[List[Dict[str, Any]]]
partition_key=None, # type: Optional[Any]
max_item_count=None, # type: Optional[int]
enable_scan_in_query=None, # type: Optional[bool]
populate_query_metrics=None, # type: Optional[bool]
**kwargs # type: Any
):
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
"""Return all results matching the given `query`.
Expand All @@ -299,6 +318,11 @@ def query_items(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request.
:keyword Callable response_hook: A callable invoked with the response metadata.
**Provisional** keyword argument max_integrated_cache_staleness_in_ms
:keyword int max_integrated_cache_staleness_in_ms:
The max cache staleness for the integrated cache in milliseconds.
For accounts configured to use the integrated cache, using Session or Eventual consistency,
responses are guaranteed to be no staler than this value.
:returns: An AsyncItemPaged of items (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
Expand Down Expand Up @@ -332,6 +356,10 @@ def query_items(
feed_options["partitionKey"] = self._set_partition_key(partition_key)
else:
feed_options["enableCrossPartitionQuery"] = True
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
if max_integrated_cache_staleness_in_ms:
validate_cache_staleness_value(max_integrated_cache_staleness_in_ms)
feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms

if hasattr(response_hook, "clear"):
response_hook.clear()
Expand All @@ -350,12 +378,12 @@ def query_items(

@distributed_trace
def query_items_change_feed(
self,
partition_key_range_id=None, # type: Optional[str]
is_start_from_beginning=False, # type: bool
continuation=None, # type: Optional[str]
max_item_count=None, # type: Optional[int]
**kwargs # type: Any
self,
partition_key_range_id=None, # type: Optional[str]
is_start_from_beginning=False, # type: bool
continuation=None, # type: Optional[str]
max_item_count=None, # type: Optional[int]
**kwargs # type: Any
):
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
"""Get a sorted list of items that were changed, in the order in which they were modified.
Expand Down Expand Up @@ -397,11 +425,11 @@ def query_items_change_feed(

@distributed_trace_async
async def upsert_item(
self,
body, # type: Dict[str, Any]
pre_trigger_include=None, # type: Optional[str]
post_trigger_include=None, # type: Optional[str]
**kwargs # type: Any
self,
body, # type: Dict[str, Any]
pre_trigger_include=None, # type: Optional[str]
post_trigger_include=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> Dict[str, Any]
"""Insert or update the specified item.
Expand Down Expand Up @@ -442,12 +470,12 @@ async def upsert_item(

@distributed_trace_async
async def replace_item(
self,
item, # type: Union[str, Dict[str, Any]]
body, # type: Dict[str, Any]
pre_trigger_include=None, # type: Optional[str]
post_trigger_include=None, # type: Optional[str]
**kwargs # type: Any
self,
item, # type: Union[str, Dict[str, Any]]
body, # type: Dict[str, Any]
pre_trigger_include=None, # type: Optional[str]
post_trigger_include=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> Dict[str, Any]
"""Replaces the specified item if it exists in the container.
Expand Down Expand Up @@ -487,12 +515,12 @@ async def replace_item(

@distributed_trace_async
async def delete_item(
self,
item, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
pre_trigger_include=None, # type: Optional[str]
post_trigger_include=None, # type: Optional[str]
**kwargs # type: Any
self,
item, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
pre_trigger_include=None, # type: Optional[str]
post_trigger_include=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> None
"""Delete the specified item from the container.
Expand Down Expand Up @@ -617,12 +645,12 @@ def list_conflicts(self, max_item_count=None, **kwargs):

@distributed_trace
def query_conflicts(
self,
query, # type: str
parameters=None, # type: Optional[List[Dict[str, Any]]]
partition_key=None, # type: Optional[Any]
max_item_count=None, # type: Optional[int]
**kwargs # type: Any
self,
query, # type: str
parameters=None, # type: Optional[List[Dict[str, Any]]]
partition_key=None, # type: Optional[Any]
max_item_count=None, # type: Optional[int]
**kwargs # type: Any
):
# type: (...) -> AsyncItemPaged[Dict[str, Any]]
"""Return all conflicts matching a given `query`.
Expand Down Expand Up @@ -657,10 +685,10 @@ def query_conflicts(

@distributed_trace_async
async def read_conflict(
self,
conflict, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
**kwargs # type: Any
self,
conflict, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
**kwargs # type: Any
):
# type: (Union[str, Dict[str, Any]], Any, Any) -> Dict[str, Any]
"""Get the conflict identified by `conflict`.
Expand All @@ -686,10 +714,10 @@ async def read_conflict(

@distributed_trace_async
async def delete_conflict(
self,
conflict, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
**kwargs # type: Any
self,
conflict, # type: Union[str, Dict[str, Any]]
partition_key, # type: Any
**kwargs # type: Any
):
# type: (Union[str, Dict[str, Any]], Any, Any) -> None
"""Delete a specified conflict from the container.
Expand Down
Loading

0 comments on commit fa6a612

Please sign in to comment.