diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 7fa70c49654d..9e8ce1fad847 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -12,13 +12,17 @@ > for more details on consistency levels, or the README section on this change [here](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos#note-on-client-consistency-levels). #### Features Added -- Added support for split-proof queries for the async client. +- Added new **provisional** `max_integrated_cache_staleness_in_ms` parameter to read item and query items APIs in order + to make use of the **preview** CosmosDB integrated cache functionality. + Please see [Azure Cosmos DB integrated cache](https://docs.microsoft.com/azure/cosmos-db/integrated-cache) for more details. +- Added support for split-proof queries for the async client ### Bugs fixed -- Default consistency level for the sync and async clients is no longer "Session" and will instead be set to the - consistency level of the user's cosmos account setting on initialization if not passed during client initialization. - This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency will need to pass it explicitly if their account consistency is different than `Session`. - Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details. +- Default consistency level for the sync and async clients is no longer `Session` and will instead be set to the + consistency level of the user's cosmos account setting on initialization if not passed during client initialization. + This change will impact client application in terms of RUs and latency. Users relying on default `Session` consistency + will need to pass it explicitly if their account consistency is different than `Session`. + Please see [Consistency Levels in Azure Cosmos DB](https://docs.microsoft.com/azure/cosmos-db/consistency-levels) for more details. - Fixed invalid request body being sent when passing in `serverScript` body parameter to replace operations for trigger, sproc and udf resources. - Moved `is_system_key` logic in async client. - Fixed TypeErrors not being thrown when passing in invalid connection retry policies to the client. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index 9a42583513ed..f2539bc30319 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -71,6 +71,7 @@ 'query_version': 'queryVersion' } + def _get_match_headers(kwargs): # type: (Dict[str, Any]) -> Tuple(Optional[str], Optional[str]) if_match = kwargs.pop('if_match', None) @@ -112,14 +113,14 @@ def build_options(kwargs): def GetHeaders( # pylint: disable=too-many-statements,too-many-branches - cosmos_client_connection, - default_headers, - verb, - path, - resource_id, - resource_type, - options, - partition_key_range_id=None, + cosmos_client_connection, + default_headers, + verb, + path, + resource_id, + resource_type, + options, + partition_key_range_id=None, ): """Gets HTTP request headers. @@ -292,6 +293,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches if options.get("populateQuotaInfo"): headers[http_constants.HttpHeaders.PopulateQuotaInfo] = options["populateQuotaInfo"] + if options.get("maxIntegratedCacheStaleness"): + headers[http_constants.HttpHeaders.DedicatedGatewayCacheStaleness] = options["maxIntegratedCacheStaleness"] + return headers @@ -638,7 +642,7 @@ def ParsePaths(paths): newIndex += 1 # This will extract the token excluding the quote chars - token = path[currentIndex + 1 : newIndex] + token = path[currentIndex + 1: newIndex] tokens.append(token) currentIndex = newIndex + 1 else: @@ -657,3 +661,9 @@ def ParsePaths(paths): tokens.append(token) return tokens + + +def validate_cache_staleness_value(max_integrated_cache_staleness): + int(max_integrated_cache_staleness) # Will throw error if data type cant be converted to int + if max_integrated_cache_staleness <= 0: + raise ValueError("Parameter 'max_integrated_cache_staleness_in_ms' can only be a positive integer.") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index fa19f0cfa914..0f831405c073 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -141,7 +141,7 @@ def __init__( http_constants.HttpHeaders.IsContinuationExpected: False, } - # Keeps the latest response headers from server. + # Keeps the latest response headers from the server. self.last_response_headers = None self._useMultipleWriteLocations = False diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_version.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_version.py index 95ffcee7f0ce..e56bcebce29f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_version.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_version.py @@ -19,4 +19,4 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -VERSION = "4.3.0b2" +VERSION = "4.3.0b3" diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 42f9daec74dd..dab5b82b52a0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -141,7 +141,7 @@ def __init__( if consistency_level is not None: self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level - # Keeps the latest response headers from server. + # Keeps the latest response headers from the server. self.last_response_headers = None self._useMultipleWriteLocations = False diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/container.py index cbfa52bec300..3dc34382eec9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/container.py @@ -29,7 +29,7 @@ from azure.core.tracing.decorator_async import distributed_trace_async # type: ignore from ._cosmos_client_connection_async import CosmosClientConnection -from .._base import build_options as _build_options +from .._base import build_options as _build_options, validate_cache_staleness_value from ..exceptions import CosmosResourceNotFoundError from ..http_constants import StatusCodes from ..offer import Offer @@ -38,6 +38,7 @@ __all__ = ("ContainerProxy",) + # pylint: disable=protected-access # pylint: disable=missing-client-constructor-parameter-credential,missing-client-constructor-parameter-kwargs @@ -112,10 +113,10 @@ async def _set_partition_key(self, partition_key): @distributed_trace_async async def read( - self, - populate_partition_key_range_statistics=None, # type: Optional[bool] - populate_quota_info=None, # type: Optional[bool] - **kwargs # type: Any + self, + populate_partition_key_range_statistics=None, # type: Optional[bool] + populate_quota_info=None, # type: Optional[bool] + **kwargs # type: Any ): # type: (...) -> Dict[str, Any] """Read the container properties. @@ -150,9 +151,9 @@ async def read( @distributed_trace_async async def create_item( - self, - body, # type: Dict[str, Any] - **kwargs # type: Any + self, + body, # type: Dict[str, Any] + **kwargs # type: Any ): # type: (...) -> Dict[str, Any] """Create an item in the container. @@ -198,10 +199,10 @@ async def create_item( @distributed_trace_async async def read_item( - self, - item, # type: Union[str, Dict[str, Any]] - partition_key, # type: Any - **kwargs # type: Any + self, + item, # type: Union[str, Dict[str, Any]] + partition_key, # type: Any + **kwargs # type: Any ): # type: (...) -> Dict[str, Any] """Get the item identified by `item`. @@ -211,6 +212,11 @@ async def read_item( :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + **Provisional** keyword argument max_integrated_cache_staleness_in_ms + :keyword int max_integrated_cache_staleness_in_ms: + The max cache staleness for the integrated cache in milliseconds. + For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. :returns: Dict representing the item to be retrieved. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved. :rtype: dict[str, Any] @@ -230,6 +236,10 @@ async def read_item( response_hook = kwargs.pop('response_hook', None) if partition_key is not None: request_options["partitionKey"] = await self._set_partition_key(partition_key) + max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None) + if max_integrated_cache_staleness_in_ms is not None: + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms result = await self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs) if response_hook: @@ -238,9 +248,9 @@ async def read_item( @distributed_trace def read_all_items( - self, - max_item_count=None, # type: Optional[int] - **kwargs # type: Any + self, + max_item_count=None, # type: Optional[int] + **kwargs # type: Any ): # type: (...) -> AsyncItemPaged[Dict[str, Any]] """List all the items in the container. @@ -249,6 +259,11 @@ def read_all_items( :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + **Provisional** keyword argument max_integrated_cache_staleness_in_ms + :keyword int max_integrated_cache_staleness_in_ms: + The max cache staleness for the integrated cache in milliseconds. + For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -256,6 +271,10 @@ def read_all_items( response_hook = kwargs.pop('response_hook', None) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count + max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None) + if max_integrated_cache_staleness_in_ms: + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms if hasattr(response_hook, "clear"): response_hook.clear() @@ -269,14 +288,14 @@ def read_all_items( @distributed_trace def query_items( - self, - query, # type: str - parameters=None, # type: Optional[List[Dict[str, Any]]] - partition_key=None, # type: Optional[Any] - max_item_count=None, # type: Optional[int] - enable_scan_in_query=None, # type: Optional[bool] - populate_query_metrics=None, # type: Optional[bool] - **kwargs # type: Any + self, + query, # type: str + parameters=None, # type: Optional[List[Dict[str, Any]]] + partition_key=None, # type: Optional[Any] + max_item_count=None, # type: Optional[int] + enable_scan_in_query=None, # type: Optional[bool] + populate_query_metrics=None, # type: Optional[bool] + **kwargs # type: Any ): # type: (...) -> AsyncItemPaged[Dict[str, Any]] """Return all results matching the given `query`. @@ -299,6 +318,11 @@ def query_items( :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + **Provisional** keyword argument max_integrated_cache_staleness_in_ms + :keyword int max_integrated_cache_staleness_in_ms: + The max cache staleness for the integrated cache in milliseconds. + For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -332,6 +356,10 @@ def query_items( feed_options["partitionKey"] = self._set_partition_key(partition_key) else: feed_options["enableCrossPartitionQuery"] = True + max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None) + if max_integrated_cache_staleness_in_ms: + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms if hasattr(response_hook, "clear"): response_hook.clear() @@ -350,12 +378,12 @@ def query_items( @distributed_trace def query_items_change_feed( - self, - partition_key_range_id=None, # type: Optional[str] - is_start_from_beginning=False, # type: bool - continuation=None, # type: Optional[str] - max_item_count=None, # type: Optional[int] - **kwargs # type: Any + self, + partition_key_range_id=None, # type: Optional[str] + is_start_from_beginning=False, # type: bool + continuation=None, # type: Optional[str] + max_item_count=None, # type: Optional[int] + **kwargs # type: Any ): # type: (...) -> AsyncItemPaged[Dict[str, Any]] """Get a sorted list of items that were changed, in the order in which they were modified. @@ -397,11 +425,11 @@ def query_items_change_feed( @distributed_trace_async async def upsert_item( - self, - body, # type: Dict[str, Any] - pre_trigger_include=None, # type: Optional[str] - post_trigger_include=None, # type: Optional[str] - **kwargs # type: Any + self, + body, # type: Dict[str, Any] + pre_trigger_include=None, # type: Optional[str] + post_trigger_include=None, # type: Optional[str] + **kwargs # type: Any ): # type: (...) -> Dict[str, Any] """Insert or update the specified item. @@ -442,12 +470,12 @@ async def upsert_item( @distributed_trace_async async def replace_item( - self, - item, # type: Union[str, Dict[str, Any]] - body, # type: Dict[str, Any] - pre_trigger_include=None, # type: Optional[str] - post_trigger_include=None, # type: Optional[str] - **kwargs # type: Any + self, + item, # type: Union[str, Dict[str, Any]] + body, # type: Dict[str, Any] + pre_trigger_include=None, # type: Optional[str] + post_trigger_include=None, # type: Optional[str] + **kwargs # type: Any ): # type: (...) -> Dict[str, Any] """Replaces the specified item if it exists in the container. @@ -487,12 +515,12 @@ async def replace_item( @distributed_trace_async async def delete_item( - self, - item, # type: Union[str, Dict[str, Any]] - partition_key, # type: Any - pre_trigger_include=None, # type: Optional[str] - post_trigger_include=None, # type: Optional[str] - **kwargs # type: Any + self, + item, # type: Union[str, Dict[str, Any]] + partition_key, # type: Any + pre_trigger_include=None, # type: Optional[str] + post_trigger_include=None, # type: Optional[str] + **kwargs # type: Any ): # type: (...) -> None """Delete the specified item from the container. @@ -617,12 +645,12 @@ def list_conflicts(self, max_item_count=None, **kwargs): @distributed_trace def query_conflicts( - self, - query, # type: str - parameters=None, # type: Optional[List[Dict[str, Any]]] - partition_key=None, # type: Optional[Any] - max_item_count=None, # type: Optional[int] - **kwargs # type: Any + self, + query, # type: str + parameters=None, # type: Optional[List[Dict[str, Any]]] + partition_key=None, # type: Optional[Any] + max_item_count=None, # type: Optional[int] + **kwargs # type: Any ): # type: (...) -> AsyncItemPaged[Dict[str, Any]] """Return all conflicts matching a given `query`. @@ -657,10 +685,10 @@ def query_conflicts( @distributed_trace_async async def read_conflict( - self, - conflict, # type: Union[str, Dict[str, Any]] - partition_key, # type: Any - **kwargs # type: Any + self, + conflict, # type: Union[str, Dict[str, Any]] + partition_key, # type: Any + **kwargs # type: Any ): # type: (Union[str, Dict[str, Any]], Any, Any) -> Dict[str, Any] """Get the conflict identified by `conflict`. @@ -686,10 +714,10 @@ async def read_conflict( @distributed_trace_async async def delete_conflict( - self, - conflict, # type: Union[str, Dict[str, Any]] - partition_key, # type: Any - **kwargs # type: Any + self, + conflict, # type: Union[str, Dict[str, Any]] + partition_key, # type: Any + **kwargs # type: Any ): # type: (Union[str, Dict[str, Any]], Any, Any) -> None """Delete a specified conflict from the container. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 9d3939a281d4..06bc19d4a6a9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -27,7 +27,7 @@ from azure.core.tracing.decorator import distributed_trace # type: ignore from ._cosmos_client_connection import CosmosClientConnection -from ._base import build_options +from ._base import build_options, validate_cache_staleness_value from .exceptions import CosmosResourceNotFoundError from .http_constants import StatusCodes from .offer import Offer @@ -169,6 +169,11 @@ def read_item( :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + **Provisional** keyword argument max_integrated_cache_staleness_in_ms + :keyword int max_integrated_cache_staleness_in_ms: + The max cache staleness for the integrated cache in milliseconds. + For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. :returns: Dict representing the item to be retrieved. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved. :rtype: dict[str, Any] @@ -193,6 +198,10 @@ def read_item( request_options["populateQueryMetrics"] = populate_query_metrics if post_trigger_include is not None: request_options["postTriggerInclude"] = post_trigger_include + max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None) + if max_integrated_cache_staleness_in_ms is not None: + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms result = self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs) if response_hook: @@ -214,6 +223,11 @@ def read_all_items( :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + **Provisional** keyword argument max_integrated_cache_staleness_in_ms + :keyword int max_integrated_cache_staleness_in_ms: + The max cache staleness for the integrated cache in milliseconds. + For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. :returns: An Iterable of items (dicts). :rtype: Iterable[dict[str, Any]] """ @@ -223,6 +237,10 @@ def read_all_items( feed_options["maxItemCount"] = max_item_count if populate_query_metrics is not None: feed_options["populateQueryMetrics"] = populate_query_metrics + max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None) + if max_integrated_cache_staleness_in_ms: + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms if hasattr(response_hook, "clear"): response_hook.clear() @@ -316,6 +334,11 @@ def query_items( :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + **Provisional** keyword argument max_integrated_cache_staleness_in_ms + :keyword int max_integrated_cache_staleness_in_ms: + The max cache staleness for the integrated cache in milliseconds. + For accounts configured to use the integrated cache, using Session or Eventual consistency, + responses are guaranteed to be no staler than this value. :returns: An Iterable of items (dicts). :rtype: Iterable[dict[str, Any]] @@ -349,6 +372,10 @@ def query_items( feed_options["partitionKey"] = self._set_partition_key(partition_key) if enable_scan_in_query is not None: feed_options["enableScanInQuery"] = enable_scan_in_query + max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None) + if max_integrated_cache_staleness_in_ms: + validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) + feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms if hasattr(response_hook, "clear"): response_hook.clear() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py index b658af7389f0..785ac2b728c1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py @@ -199,6 +199,10 @@ class HttpHeaders(object): # For Using Multiple Write Locations AllowTentativeWrites = "x-ms-cosmos-allow-tentative-writes" + # Dedicated Gateway headers + DedicatedGatewayCacheStaleness = "x-ms-dedicatedgateway-max-age" + IntegratedCacheHit = "x-ms-cosmos-cachehit" + class HttpHeaderPreferenceTokens(object): """Constants of http header preference tokens. diff --git a/sdk/cosmos/azure-cosmos/test/cleanup.py b/sdk/cosmos/azure-cosmos/test/cleanup.py index 30cd5c0e0e82..3307a3c9c61a 100644 --- a/sdk/cosmos/azure-cosmos/test/cleanup.py +++ b/sdk/cosmos/azure-cosmos/test/cleanup.py @@ -11,13 +11,16 @@ def delete_database(database_id): try: client = cosmos_client.CosmosClient(host, masterKey, "Session", connection_policy=connectionPolicy) # This is to soft-fail the teardown while cosmos tests are not running automatically - except Exception: + except Exception as exception: + print("Error while initialing the client", exception) pass else: try: + print("Deleting database with id : ", database_id) client.delete_database(database_id) - print("Deleted " + database_id) - except exceptions.CosmosResourceNotFoundError: + print("Deleted : ", database_id) + except exceptions.CosmosResourceNotFoundError as exception: + print("Error while deleting database", exception) pass print("Clean up completed!") diff --git a/sdk/cosmos/azure-cosmos/test/test_config.py b/sdk/cosmos/azure-cosmos/test/test_config.py index a01966f2a67d..7bef8d01d629 100644 --- a/sdk/cosmos/azure-cosmos/test/test_config.py +++ b/sdk/cosmos/azure-cosmos/test/test_config.py @@ -59,6 +59,7 @@ class _test_config(object): THROUGHPUT_FOR_1_PARTITION = 400 TEST_DATABASE_ID = os.getenv('COSMOS_TEST_DATABASE_ID', "Python SDK Test Database " + str(uuid.uuid4())) + TEST_DATABASE_ID_PLAIN = "COSMOS_TEST_DATABASE" TEST_THROUGHPUT_DATABASE_ID = "Python SDK Test Throughput Database " + str(uuid.uuid4()) TEST_COLLECTION_SINGLE_PARTITION_ID = "Single Partition Test Collection" TEST_COLLECTION_MULTI_PARTITION_ID = "Multi Partition Test Collection" diff --git a/sdk/cosmos/azure-cosmos/test/test_headers.py b/sdk/cosmos/azure-cosmos/test/test_headers.py new file mode 100644 index 000000000000..05d388f1836c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_headers.py @@ -0,0 +1,86 @@ +# The MIT License (MIT) +# Copyright (c) 2014 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import unittest +from unittest.mock import MagicMock + +import pytest + +import azure.cosmos.cosmos_client as cosmos_client +import test_config + +pytestmark = pytest.mark.cosmosEmulator + + +@pytest.mark.usefixtures("teardown") +class HeadersTest(unittest.TestCase): + configs = test_config._test_config + host = configs.host + masterKey = configs.masterKey + + dedicated_gateway_max_age_thousand = 1000 + dedicated_gateway_max_age_million = 1000000 + dedicated_gateway_max_age_zero = 0 + + @classmethod + def setUpClass(cls): + cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey) + cls.database = test_config._test_config.create_database_if_not_exist(cls.client) + cls.container = test_config._test_config.create_single_partition_collection_if_not_exist(cls.client) + + def side_effect_dedicated_gateway_max_age_thousand(self, *args, **kwargs): + # Extract request headers from args + assert args[2]["x-ms-dedicatedgateway-max-age"] == self.dedicated_gateway_max_age_thousand + raise StopIteration + + def side_effect_dedicated_gateway_max_age_million(self, *args, **kwargs): + # Extract request headers from args + assert args[2]["x-ms-dedicatedgateway-max-age"] == self.dedicated_gateway_max_age_million + raise StopIteration + + def test_max_integrated_cache_staleness(self): + cosmos_client_connection = self.container.client_connection + cosmos_client_connection._CosmosClientConnection__Get = MagicMock( + side_effect=self.side_effect_dedicated_gateway_max_age_thousand) + try: + self.container.read_item(item="id-1", partition_key="pk-1", + max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_thousand) + except StopIteration: + pass + + cosmos_client_connection._CosmosClientConnection__Get = MagicMock( + side_effect=self.side_effect_dedicated_gateway_max_age_million) + try: + self.container.read_item(item="id-1", partition_key="pk-1", + max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_million) + except StopIteration: + pass + + def test_zero_max_integrated_cache_staleness(self): + try: + self.container.read_item(item="id-1", partition_key="pk-1", + max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_zero) + except Exception as exception: + assert isinstance(exception, ValueError) + + +if __name__ == "__main__": + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_integrated_cache.py b/sdk/cosmos/azure-cosmos/test/test_integrated_cache.py new file mode 100644 index 000000000000..c62b04590fdc --- /dev/null +++ b/sdk/cosmos/azure-cosmos/test/test_integrated_cache.py @@ -0,0 +1,90 @@ +# The MIT License (MIT) +# Copyright (c) 2022 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +# import unittest +# +# # This class tests the integrated cache, which only works against accounts with the dedicated gateway configured +# # This class tests a PREVIEW FEATURE +# +# pytestmark = pytest.mark.cosmosEmulator +# +# +# @pytest.mark.usefixtures("teardown") +# class TestIntegratedCache(unittest.TestCase): +# +# # This test only works if run manually, so we will leave it commented out. +# +# def test_RU_cost(self): +# if self.host.endswith(":8081/"): +# print("Skipping; this test only works for accounts with the dedicated gateway configured, or the " + +# "emulator running with the proper setup flags, which should run on port 8082.") +# return +# +# body = self.get_test_item() +# self.container.create_item(body) +# item_id = body['id'] +# +# # Initialize cache for item point read, and verify there is a cost to the read call +# self.container.read_item(item=item_id, partition_key=item_id, max_integrated_cache_staleness_in_ms=30000) +# self.assertEqual(self.client.client_connection.last_response_headers[headers.IntegratedCacheHit], 'False') +# self.assertTrue(float(self.client.client_connection.last_response_headers[headers.RequestCharge]) > 0) +# +# # Verify that cache is being hit for item read and that there's no RU consumption for this second read +# self.container.read_item(item=item_id, partition_key=item_id) +# self.assertEqual(self.client.client_connection.last_response_headers[headers.IntegratedCacheHit], 'True') +# self.assertTrue(float(self.client.client_connection.last_response_headers[headers.RequestCharge]) == 0) +# +# body = self.get_test_item() +# self.container.create_item(body) +# item_id = body["id"] +# query = 'SELECT * FROM c' +# +# # Initialize cache for single partition query read, and verify there is a cost to the query call +# # Need to iterate over query results in order to properly populate last response headers, so we cast to list +# list(self.container.query_items(query=query, +# partition_key=item_id, max_integrated_cache_staleness_in_ms=30000)) +# self.assertEqual(self.client.client_connection.last_response_headers[headers.IntegratedCacheHit], 'False') +# self.assertTrue(float(self.client.client_connection.last_response_headers[headers.RequestCharge]) > 0) +# +# # Verify that cache is being hit for item query and that there's no RU consumption for this second query +# list(self.container.query_items(query=query, partition_key=item_id)) +# self.assertEqual(self.client.client_connection.last_response_headers[headers.IntegratedCacheHit], 'True') +# self.assertTrue(float(self.client.client_connection.last_response_headers[headers.RequestCharge]) == 0) +# +# # Verify that reading all items does not have a cost anymore, since all items have been populated into cache +# self.container.read_all_items() +# self.assertEqual(self.client.client_connection.last_response_headers[headers.IntegratedCacheHit], 'True') +# self.assertTrue(float(self.client.client_connection.last_response_headers[headers.RequestCharge]) == 0) +# +# self.client.delete_database(test_config._test_config.TEST_DATABASE_ID_PLAIN) +# +# def get_test_item(self): +# test_item = { +# 'id': 'Item_' + str(uuid.uuid4()), +# 'test_object': True, +# 'lastName': 'Smith', +# 'attr1': random.randint(0, 10) +# } +# return test_item +# +# +# if __name__ == "__main__": +# unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py b/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py index fb42fcdee10d..a00aadb36e4d 100644 --- a/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py +++ b/sdk/cosmos/azure-cosmos/test/test_partition_split_query.py @@ -89,18 +89,13 @@ def run_queries(self, container, iterations): print("validation succeeded for all query results") def get_test_item(self): - async_item = { - 'id': 'Async_' + str(uuid.uuid4()), - 'address': { - 'state': 'WA', - 'city': 'Redmond', - 'street': '1 Microsoft Way' - }, + test_item = { + 'id': 'Item_' + str(uuid.uuid4()), 'test_object': True, 'lastName': 'Smith', 'attr1': random.randint(0, 10) } - return async_item + return test_item if __name__ == "__main__":