diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 66274aa624ad..fcec879fc406 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,7 @@ ### 4.9.1 (Unreleased) #### Features Added +* Added change feed mode support in `query_items_change_feed`. See [PR 38105](https://github.com/Azure/azure-sdk-for-python/pull/38105) #### Breaking Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index 2362491898b8..5330089bdcba 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -61,7 +61,8 @@ 'supported_query_features': 'supportedQueryFeatures', 'query_version': 'queryVersion', 'priority': 'priorityLevel', - 'no_response': 'responsePayloadOnWriteDisabled' + 'no_response': 'responsePayloadOnWriteDisabled', + 'max_item_count': 'maxItemCount', } # Cosmos resource ID validation regex breakdown: @@ -170,6 +171,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches # set consistency level. check if set via options, this will override the default if options.get("consistencyLevel"): consistency_level = options["consistencyLevel"] + # TODO: move this line outside of if-else cause to remove the code duplication headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level elif default_client_consistency_level is not None: consistency_level = default_client_consistency_level diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py index ac6f00017af4..4f513a2d8662 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_state.py @@ -29,6 +29,8 @@ from abc import ABC, abstractmethod from enum import Enum from typing import Optional, Union, List, Any, Dict, Deque +import logging +from typing_extensions import Literal from azure.cosmos import http_constants from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \ @@ -176,18 +178,18 @@ def apply_server_response_continuation(self, continuation: str, has_modified_res class ChangeFeedStateV2(ChangeFeedState): container_rid_property_name = "containerRid" - change_feed_mode_property_name = "mode" + mode_property_name = "mode" change_feed_start_from_property_name = "startFrom" continuation_property_name = "continuation" - # TODO: adding change feed mode def __init__( self, container_link: str, container_rid: str, feed_range: FeedRangeInternal, change_feed_start_from: ChangeFeedStartFromInternal, - continuation: Optional[FeedRangeCompositeContinuation] + continuation: Optional[FeedRangeCompositeContinuation], + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] ) -> None: self._container_link = container_link @@ -208,6 +210,8 @@ def __init__( else: self._continuation = continuation + self._mode = "LatestVersion" if mode is None else mode + super(ChangeFeedStateV2, self).__init__(ChangeFeedStateVersion.V2) @property @@ -218,17 +222,14 @@ def to_dict(self) -> Dict[str, Any]: return { self.version_property_name: ChangeFeedStateVersion.V2.value, self.container_rid_property_name: self._container_rid, - self.change_feed_mode_property_name: "LatestVersion", + self.mode_property_name: self._mode, self.change_feed_start_from_property_name: self._change_feed_start_from.to_dict(), self.continuation_property_name: self._continuation.to_dict() if self._continuation is not None else None } - def populate_request_headers( + def set_start_from_request_headers( self, - routing_provider: SmartRoutingMapProvider, request_headers: Dict[str, Any]) -> None: - request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue - # When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time # of the documents may not be sequential. # So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts. @@ -243,11 +244,10 @@ def populate_request_headers( self._continuation.current_token.feed_range) change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers) - # based on the feed range to find the overlapping partition key range id - over_lapping_ranges =\ - routing_provider.get_overlapping_ranges( - self._container_link, - [self._continuation.current_token.feed_range]) + def set_pk_range_id_request_headers( + self, + over_lapping_ranges, + request_headers: Dict[str, Any]) -> None: if len(over_lapping_ranges) > 1: raise self.get_feed_range_gone_error(over_lapping_ranges) @@ -260,30 +260,41 @@ def populate_request_headers( # the current token feed range spans less than single physical partition # for this case, need to set both the partition key range id and epk filter headers request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"] - request_headers[ - http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min - request_headers[ - http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max + request_headers[http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min + request_headers[http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max - async def populate_request_headers_async( + def set_mode_request_headers( self, - async_routing_provider: AsyncSmartRoutingMapProvider, request_headers: Dict[str, Any]) -> None: - request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue + if self._mode == "AllVersionsAndDeletes": + request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.FullFidelityFeedHeaderValue + request_headers[http_constants.HttpHeaders.ChangeFeedWireFormatVersion] = \ + http_constants.HttpHeaders.SeparateMetaWithCrts + else: + request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue - # When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time - # of the documents may not be sequential. - # So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts. - # In order to guarantee we always get the documents after customer's point start time, - # we will need to always pass the start time in the header. - self._change_feed_start_from.populate_request_headers(request_headers) + def populate_request_headers( + self, + routing_provider: SmartRoutingMapProvider, + request_headers: Dict[str, Any]) -> None: + self.set_start_from_request_headers(request_headers) + + # based on the feed range to find the overlapping partition key range id + over_lapping_ranges = \ + routing_provider.get_overlapping_ranges( + self._container_link, + [self._continuation.current_token.feed_range]) + + self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers) + + self.set_mode_request_headers(request_headers) - if self._continuation.current_token is not None and self._continuation.current_token.token is not None: - change_feed_start_from_feed_range_and_etag = \ - ChangeFeedStartFromETagAndFeedRange( - self._continuation.current_token.token, - self._continuation.current_token.feed_range) - change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers) + + async def populate_request_headers_async( + self, + async_routing_provider: AsyncSmartRoutingMapProvider, + request_headers: Dict[str, Any]) -> None: + self.set_start_from_request_headers(request_headers) # based on the feed range to find the overlapping partition key range id over_lapping_ranges = \ @@ -291,22 +302,9 @@ async def populate_request_headers_async( self._container_link, [self._continuation.current_token.feed_range]) - if len(over_lapping_ranges) > 1: - raise self.get_feed_range_gone_error(over_lapping_ranges) + self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers) - overlapping_feed_range = Range.PartitionKeyRangeToRange(over_lapping_ranges[0]) - if overlapping_feed_range == self._continuation.current_token.feed_range: - # exactly mapping to one physical partition, only need to set the partitionKeyRangeId - request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"] - else: - # the current token feed range spans less than single physical partition - # for this case, need to set both the partition key range id and epk filter headers - request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = \ - over_lapping_ranges[0]["id"] - request_headers[http_constants.HttpHeaders.StartEpkString] = \ - self._continuation.current_token.feed_range.min - request_headers[http_constants.HttpHeaders.EndEpkString] = \ - self._continuation.current_token.feed_range.max + self.set_mode_request_headers(request_headers) def populate_feed_options(self, feed_options: Dict[str, Any]) -> None: pass @@ -367,12 +365,20 @@ def from_continuation( if continuation_data is None: raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.continuation_property_name}]") continuation = FeedRangeCompositeContinuation.from_json(continuation_data) - return ChangeFeedStateV2( + + mode = continuation_json.get(ChangeFeedStateV2.mode_property_name) + # All 'continuation_json' from ChangeFeedStateV2 must contain 'mode' property. For the 'continuation_json' + # from older ChangeFeedState versions won't even hit this point, since their version is not 'v2'. + if mode is None: + raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.mode_property_name}]") + + return cls( container_link=container_link, container_rid=container_rid, feed_range=continuation.feed_range, change_feed_start_from=change_feed_start_from, - continuation=continuation) + continuation=continuation, + mode=mode) @classmethod def from_initial_state( @@ -394,6 +400,7 @@ def from_initial_state( raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange") else: # default to full range + logging.info("'feed_range' empty. Using full range by default.") feed_range = FeedRangeInternalEpk( Range( "", @@ -405,11 +412,12 @@ def from_initial_state( change_feed_start_from = ( ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime"))) - if feed_range is not None: - return cls( - container_link=container_link, - container_rid=collection_rid, - feed_range=feed_range, - change_feed_start_from=change_feed_start_from, - continuation=None) - raise ValueError("feed_range is empty") + mode = change_feed_state_context.get("mode") + + return cls( + container_link=container_link, + container_rid=collection_rid, + feed_range=feed_range, + change_feed_start_from=change_feed_start_from, + continuation=None, + mode=mode) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py new file mode 100644 index 000000000000..15854405549e --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_change_feed/change_feed_utils.py @@ -0,0 +1,146 @@ +# The MIT License (MIT) +# Copyright (c) 2014 Microsoft Corporation + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Internal Helper functions in the Azure Cosmos database change_feed service. +""" + +import warnings +from datetime import datetime +from typing import Any, Dict, Tuple + +CHANGE_FEED_MODES = ["LatestVersion", "AllVersionsAndDeletes"] + +def add_args_to_kwargs( + args: Tuple[Any, ...], + kwargs: Dict[str, Any] + ) -> None: + """Add positional arguments(args) to keyword argument dictionary(kwargs). + Since 'query_items_change_feed' method only allows the following 4 positional arguments in the exact order + and types, if the order and types don't match, errors will be raised. + If the positional arguments are in the correct orders and types, the arguments will be added to keyword arguments. + + 4 positional arguments: + - str 'partition_key_range_id': [Deprecated] ChangeFeed requests can be executed against specific partition + key ranges. This is used to process the change feed in parallel across multiple consumers. + - bool 'is_start_from_beginning': [Deprecated] Get whether change feed should start from + beginning (true) or from current (false). By default, it's start from current (false). + - str 'continuation': e_tag value to be used as continuation for reading change feed. + - int 'max_item_count': Max number of items to be returned in the enumeration operation. + + :param args: Positional arguments. Arguments must be in the following order: + 1. partition_key_range_id + 2. is_start_from_beginning + 3. continuation + 4. max_item_count + :type args: Tuple[Any, ...] + :param kwargs: Keyword arguments + :type kwargs: dict[str, Any] + """ + if len(args) > 4: + raise TypeError(f"'query_items_change_feed()' takes 4 positional arguments but {len(args)} were given.") + + if len(args) > 0: + keys = [ + 'partition_key_range_id', + 'is_start_from_beginning', + 'continuation', + 'max_item_count', + ] + for i, value in enumerate(args): + key = keys[i] + + if key in kwargs: + raise TypeError(f"'query_items_change_feed()' got multiple values for argument '{key}'.") + + kwargs[key] = value + +def validate_kwargs( + kwargs: Dict[str, Any] + ) -> None: + """Validate keyword arguments(kwargs). + The values of keyword arguments must match the expect type and conditions. If the conditions do not match, + errors will be raised with the error messages and possible ways to correct the errors. + + :param kwargs: Keyword arguments to verify for query_items_change_feed API + :keyword mode: Must be one of the values in the Enum, 'ChangeFeedMode'. + If the value is 'ALL_VERSIONS_AND_DELETES', the following keywords must be in the right condition: + - 'partition_key_range_id': Cannot be used at any time + - 'is_start_from_beginning': Must be 'False' + - 'start_time': Must be "Now" + :paramtype mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] + :keyword partition_key_range_id: Deprecated Warning. + :paramtype partition_key_range_id: str + :keyword is_start_from_beginning: Deprecated Warning. Cannot be used with 'start_time'. + :paramtype is_start_from_beginning: bool + :keyword start_time: Must be in supported types. + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :type kwargs: dict[str, Any] + """ + # Filter items with value None + kwargs = {key: value for key, value in kwargs.items() if value is not None} + + # Validate the keyword arguments + if "mode" in kwargs: + mode = kwargs["mode"] + if mode not in CHANGE_FEED_MODES: + raise ValueError( + f"Invalid mode was used: '{kwargs['mode']}'." + f" Supported modes are {CHANGE_FEED_MODES}.") + + if mode == 'AllVersionsAndDeletes': + if "partition_key_range_id" in kwargs: + raise ValueError( + "'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id'" + " was used. Please use 'feed_range' instead.") + if "is_start_from_beginning" in kwargs and kwargs["is_start_from_beginning"] is not False: + raise ValueError( + "'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning'" + " is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead.") + if "start_time" in kwargs and kwargs["start_time"] != "Now": + raise ValueError( + "'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'." + " Please use 'start_time=\"Now\"' or 'continuation' instead.") + + if "partition_key_range_id" in kwargs: + warnings.warn( + "'partition_key_range_id' is deprecated. Please pass in 'feed_range' instead.", + DeprecationWarning + ) + + if "is_start_from_beginning" in kwargs: + warnings.warn( + "'is_start_from_beginning' is deprecated. Please pass in 'start_time' instead.", + DeprecationWarning + ) + + if not isinstance(kwargs["is_start_from_beginning"], bool): + raise TypeError( + f"'is_start_from_beginning' must be 'bool' type," + f" but given '{type(kwargs['is_start_from_beginning']).__name__}'.") + + if kwargs["is_start_from_beginning"] is True and "start_time" in kwargs: + raise ValueError("'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them.") + + if "start_time" in kwargs: + if not isinstance(kwargs['start_time'], datetime): + if kwargs['start_time'].lower() not in ["now", "beginning"]: + raise ValueError( + f"'start_time' must be either 'Now' or 'Beginning', but given '{kwargs['start_time']}'.") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 03cdcbb8e214..e3aab396b338 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -1177,6 +1177,7 @@ def _QueryChangeFeed( def fetch_fn(options: Mapping[str, Any]) -> Tuple[List[Dict[str, Any]], CaseInsensitiveDict]: if collection_link in self.__container_properties_cache: + # TODO: This will make deep copy. Check if this has any performance impact new_options = dict(options) new_options["containerRID"] = self.__container_properties_cache[collection_link]["_rid"] options = new_options @@ -3007,6 +3008,7 @@ def __GetBodiesFromQueryResult(result: Dict[str, Any]) -> List[Dict[str, Any]]: # This case should be interpreted as an empty array. return [] + # TODO: copy is not needed if query was none, since the header was copied inside of "base.GetHeaders" initial_headers = self.default_headers.copy() # Copy to make sure that default_headers won't be changed. if query is None: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 5e7c808aded4..7584d1dea1a1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -21,7 +21,6 @@ """Create, read, update and delete items in the Azure Cosmos DB SQL API service. """ -import warnings from datetime import datetime from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union, List, Tuple, cast, overload, AsyncIterable from typing_extensions import Literal @@ -30,6 +29,7 @@ from azure.core.async_paging import AsyncItemPaged, AsyncList from azure.core.tracing.decorator import distributed_trace from azure.core.tracing.decorator_async import distributed_trace_async # type: ignore +from azure.cosmos._change_feed.change_feed_utils import validate_kwargs from ._cosmos_client_connection_async import CosmosClientConnection from ._scripts import ScriptsProxy @@ -508,6 +508,7 @@ def query_items_change_feed( start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, partition_key: PartitionKeyType, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. @@ -518,13 +519,19 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] :keyword Callable response_hook: A callable invoked with the response metadata. :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -539,6 +546,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. @@ -550,11 +558,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -571,12 +586,15 @@ def query_items_change_feed( ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. + :type continuation: str :keyword int max_item_count: Max number of items to be returned in the enumeration operation. - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -590,6 +608,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed in the entire container, @@ -601,11 +620,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -619,82 +645,58 @@ def query_items_change_feed( # pylint: disable=unused-argument """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword start_time: The start time to start processing chang feed items. Beginning: Processing the change feed items from the beginning of the change feed. Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] - :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :keyword priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ # pylint: disable=too-many-statements - if kwargs.get("priority") is not None: - kwargs['priority'] = kwargs['priority'] + validate_kwargs(kwargs) feed_options = _build_options(kwargs) change_feed_state_context = {} - # Back compatibility with deprecation warnings for partition_key_range_id - if kwargs.get("partition_key_range_id") is not None: - warnings.warn( - "partition_key_range_id is deprecated. Please pass in feed_range instead.", - DeprecationWarning - ) - - change_feed_state_context["partitionKeyRangeId"] = kwargs.pop('partition_key_range_id') - - # Back compatibility with deprecation warnings for is_start_from_beginning - if kwargs.get("is_start_from_beginning") is not None: - warnings.warn( - "is_start_from_beginning is deprecated. Please pass in start_time instead.", - DeprecationWarning - ) - - if kwargs.get("start_time") is not None: - raise ValueError("is_start_from_beginning and start_time are exclusive, please only set one of them") - - is_start_from_beginning = kwargs.pop('is_start_from_beginning') - if is_start_from_beginning is True: - change_feed_state_context["startTime"] = "Beginning" - - # parse start_time - if kwargs.get("start_time") is not None: - start_time = kwargs.pop('start_time') - if not isinstance(start_time, (datetime, str)): - raise TypeError( - "'start_time' must be either a datetime object, or either the values 'Now' or 'Beginning'.") - change_feed_state_context["startTime"] = start_time - - # parse continuation token - if feed_options.get("continuation") is not None: - change_feed_state_context["continuation"] = feed_options.pop('continuation') - - if kwargs.get("max_item_count") is not None: - feed_options["maxItemCount"] = kwargs.pop('max_item_count') - - if kwargs.get("partition_key") is not None: - change_feed_state_context["partitionKey"] =\ - self._set_partition_key(cast(PartitionKeyType, kwargs.get("partition_key"))) - change_feed_state_context["partitionKeyFeedRange"] = \ - self._get_epk_range_for_partition_key(kwargs.pop('partition_key')) - - if kwargs.get("feed_range") is not None: + if "mode" in kwargs: + change_feed_state_context["mode"] = kwargs.pop("mode") + if "partition_key_range_id" in kwargs: + change_feed_state_context["partitionKeyRangeId"] = kwargs.pop("partition_key_range_id") + if "is_start_from_beginning" in kwargs and kwargs.pop('is_start_from_beginning') is True: + change_feed_state_context["startTime"] = "Beginning" + elif "start_time" in kwargs: + change_feed_state_context["startTime"] = kwargs.pop("start_time") + if "partition_key" in kwargs: + partition_key = kwargs.pop("partition_key") + change_feed_state_context["partitionKey"] = self._set_partition_key(cast(PartitionKeyType, partition_key)) + change_feed_state_context["partitionKeyFeedRange"] = self._get_epk_range_for_partition_key(partition_key) + if "feed_range" in kwargs: change_feed_state_context["feedRange"] = kwargs.pop('feed_range') + if "continuation" in feed_options: + change_feed_state_context["continuation"] = feed_options.pop("continuation") - feed_options["containerProperties"] = self._get_properties() feed_options["changeFeedStateContext"] = change_feed_state_context + feed_options["containerProperties"] = self._get_properties() - response_hook = kwargs.pop('response_hook', None) + response_hook = kwargs.pop("response_hook", None) if hasattr(response_hook, "clear"): response_hook.clear() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py index 21b77de57ed8..49253b223f88 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py @@ -173,6 +173,7 @@ async def create_container( match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -207,6 +208,8 @@ async def create_container( :keyword Dict[str, Any] vector_embedding_policy: The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -255,6 +258,8 @@ async def create_container( definition["computedProperties"] = computed_properties if vector_embedding_policy is not None: definition["vectorEmbeddingPolicy"] = vector_embedding_policy + if change_feed_policy is not None: + definition["changeFeedPolicy"] = change_feed_policy if full_text_policy is not None: definition["fullTextPolicy"] = full_text_policy @@ -291,6 +296,7 @@ async def create_container_if_not_exists( match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -327,6 +333,8 @@ async def create_container_if_not_exists( :keyword Dict[str, Any] vector_embedding_policy: **provisional** The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -359,6 +367,7 @@ async def create_container_if_not_exists( session_token=session_token, initial_headers=initial_headers, vector_embedding_policy=vector_embedding_policy, + change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, **kwargs ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 22baa8933a0b..b60d1dbddc4d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -29,6 +29,7 @@ from azure.core import MatchConditions from azure.core.paging import ItemPaged from azure.core.tracing.decorator import distributed_trace +from azure.cosmos._change_feed.change_feed_utils import add_args_to_kwargs, validate_kwargs from ._base import ( build_options, @@ -328,6 +329,7 @@ def query_items_change_feed( start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, partition_key: PartitionKeyType, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. @@ -338,14 +340,21 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :type response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -359,6 +368,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: @@ -371,11 +381,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -392,12 +409,15 @@ def query_items_change_feed( ) -> ItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. + :paramtype continuation: str :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -410,6 +430,7 @@ def query_items_change_feed( max_item_count: Optional[int] = None, start_time: Optional[Union[datetime, Literal["Now", "Beginning"]]] = None, priority: Optional[Literal["High", "Low"]] = None, + mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Get a sorted list of items that were changed in the entire container, @@ -421,11 +442,18 @@ def query_items_change_feed( Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :paramtype priority: Literal["High", "Low"] + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -440,106 +468,68 @@ def query_items_change_feed( """Get a sorted list of items that were changed, in the order in which they were modified. - :keyword str continuation: The continuation token retrieved from previous response. + :keyword str continuation: The continuation token retrieved from previous response. It contains chang feed mode. :keyword Dict[str, Any] feed_range: The feed range that is used to define the scope. :keyword partition_key: The partition key that is used to define the scope (logical partition or a subset of a container) - :type partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] + :paramtype partition_key: Union[str, int, float, bool, Sequence[Union[str, int, float, bool, None]]] :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword start_time: The start time to start processing chang feed items. Beginning: Processing the change feed items from the beginning of the change feed. Now: Processing change feed from the current time, so only events for all future changes will be retrieved. ~datetime.datetime: processing change feed from a point of time. Provided value will be converted to UTC. By default, it is start from current ("Now") - :type start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] + :paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]] :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. - :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword mode: The modes to query change feed. If `continuation` was passed, 'mode' argument will be ignored. + LATEST_VERSION: Query latest items from 'start_time' or 'continuation' token. + ALL_VERSIONS_AND_DELETES: Query all versions and deleted items from either `start_time='Now'` + or 'continuation' token. + :paramtype mode: Literal["LatestVersion", "AllVersionsAndDeletes"] + :keyword response_hook: A callable invoked with the response metadata. + :paramtype response_hook: Callable[[Mapping[str, Any], Mapping[str, Any]], None] :param Any args: args :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ # pylint: disable=too-many-statements - if kwargs.get("priority") is not None: - kwargs['priority'] = kwargs['priority'] + add_args_to_kwargs(args, kwargs) + validate_kwargs(kwargs) feed_options = build_options(kwargs) change_feed_state_context = {} - # Back compatibility with deprecation warnings for partition_key_range_id - if (args and args[0] is not None) or kwargs.get("partition_key_range_id") is not None: - warnings.warn( - "partition_key_range_id is deprecated. Please pass in feed_range instead.", - DeprecationWarning - ) - - try: - change_feed_state_context["partitionKeyRangeId"] = kwargs.pop('partition_key_range_id') - except KeyError: - change_feed_state_context['partitionKeyRangeId'] = args[0] - - # Back compatibility with deprecation warnings for is_start_from_beginning - if (len(args) >= 2 and args[1] is not None) or kwargs.get("is_start_from_beginning") is not None: - warnings.warn( - "is_start_from_beginning is deprecated. Please pass in start_time instead.", - DeprecationWarning - ) - - if kwargs.get("start_time") is not None: - raise ValueError("is_start_from_beginning and start_time are exclusive, please only set one of them") - - try: - is_start_from_beginning = kwargs.pop('is_start_from_beginning') - except KeyError: - is_start_from_beginning = args[1] - - if is_start_from_beginning is True: - change_feed_state_context["startTime"] = "Beginning" - - # parse start_time - if kwargs.get("start_time") is not None: - - start_time = kwargs.pop('start_time') - if not isinstance(start_time, (datetime, str)): - raise TypeError( - "'start_time' must be either a datetime object, or either the values 'Now' or 'Beginning'.") - change_feed_state_context["startTime"] = start_time - - # parse continuation token - if len(args) >= 3 and args[2] is not None or feed_options.get("continuation") is not None: - try: - continuation = feed_options.pop('continuation') - except KeyError: - continuation = args[2] - change_feed_state_context["continuation"] = continuation - - if len(args) >= 4 and args[3] is not None or kwargs.get("max_item_count") is not None: - try: - feed_options["maxItemCount"] = kwargs.pop('max_item_count') - except KeyError: - feed_options["maxItemCount"] = args[3] - - if kwargs.get("partition_key") is not None: - change_feed_state_context["partitionKey"] =\ - self._set_partition_key(cast(PartitionKeyType, kwargs.get('partition_key'))) - change_feed_state_context["partitionKeyFeedRange"] =\ - self._get_epk_range_for_partition_key(kwargs.pop('partition_key')) - - if kwargs.get("feed_range") is not None: + if "mode" in kwargs: + change_feed_state_context["mode"] = kwargs.pop("mode") + if "partition_key_range_id" in kwargs: + change_feed_state_context["partitionKeyRangeId"] = kwargs.pop("partition_key_range_id") + if "is_start_from_beginning" in kwargs and kwargs.pop('is_start_from_beginning') is True: + change_feed_state_context["startTime"] = "Beginning" + elif "start_time" in kwargs: + change_feed_state_context["startTime"] = kwargs.pop("start_time") + if "partition_key" in kwargs: + partition_key = kwargs.pop("partition_key") + change_feed_state_context["partitionKey"] = self._set_partition_key(cast(PartitionKeyType, partition_key)) + change_feed_state_context["partitionKeyFeedRange"] = self._get_epk_range_for_partition_key(partition_key) + if "feed_range" in kwargs: change_feed_state_context["feedRange"] = kwargs.pop('feed_range') + if "continuation" in feed_options: + change_feed_state_context["continuation"] = feed_options.pop("continuation") container_properties = self._get_properties() feed_options["changeFeedStateContext"] = change_feed_state_context feed_options["containerRID"] = container_properties["_rid"] - response_hook = kwargs.pop('response_hook', None) + response_hook = kwargs.pop("response_hook", None) if hasattr(response_hook, "clear"): response_hook.clear() result = self.client_connection.QueryItemsChangeFeed( self.container_link, options=feed_options, response_hook=response_hook, **kwargs ) + if response_hook: response_hook(self.client_connection.last_response_headers, result) return result diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index 9940d1a932f5..f7e5a7f9d715 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -174,6 +174,7 @@ def create_container( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -204,6 +205,8 @@ def create_container( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] vector_embedding_policy: **provisional** The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -250,6 +253,8 @@ def create_container( # pylint:disable=docstring-missing-param definition["computedProperties"] = computed_properties if vector_embedding_policy is not None: definition["vectorEmbeddingPolicy"] = vector_embedding_policy + if change_feed_policy is not None: + definition["changeFeedPolicy"] = change_feed_policy if full_text_policy is not None: definition["fullTextPolicy"] = full_text_policy @@ -293,6 +298,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, analytical_storage_ttl: Optional[int] = None, vector_embedding_policy: Optional[Dict[str, Any]] = None, + change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> ContainerProxy: @@ -325,6 +331,8 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] vector_embedding_policy: The vector embedding policy for the container. Each vector embedding possesses a predetermined number of dimensions, is associated with an underlying data type, and is generated for a particular distance function. + :keyword Dict[str, Any] change_feed_policy: The change feed policy to apply 'retentionDuration' to + the container. :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. @@ -359,6 +367,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param session_token=session_token, initial_headers=initial_headers, vector_embedding_policy=vector_embedding_policy, + change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, **kwargs ) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py index 4c42cc55b0e2..b0b6a87e7ca2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py @@ -219,6 +219,11 @@ class HttpHeaders: # Change feed AIM = "A-IM" IncrementalFeedHeaderValue = "Incremental feed" + FullFidelityFeedHeaderValue = "Full-Fidelity Feed" + ChangeFeedWireFormatVersion = "x-ms-cosmos-changefeed-wire-format-version" + + # Change feed wire format version + SeparateMetaWithCrts = "2021-09-15" # For Using Multiple Write Locations AllowTentativeWrites = "x-ms-cosmos-allow-tentative-writes" diff --git a/sdk/cosmos/azure-cosmos/samples/change_feed_management.py b/sdk/cosmos/azure-cosmos/samples/change_feed_management.py index f5e854406764..e790fe636be6 100644 --- a/sdk/cosmos/azure-cosmos/samples/change_feed_management.py +++ b/sdk/cosmos/azure-cosmos/samples/change_feed_management.py @@ -73,6 +73,51 @@ def read_change_feed_with_start_time(container, start_time): print('\nFinished reading all the change feed from start time of {}\n'.format(time)) +def read_change_feed_with_continuation(container, continuation): + print('\nReading change feed from continuation\n') + + # You can read change feed from a specific continuation token. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(continuation=continuation) + for doc in response: + print(doc) + + print('\nFinished reading all the change feed from continuation\n') + +def delete_all_items(container): + print('\nDeleting all item\n') + + for item in container.query_items(query='SELECT * FROM c', enable_cross_partition_query=True): + # Deleting the current item + container.delete_item(item, partition_key=item['address']['state']) + + print('Deleted all items') + +def read_change_feed_with_all_versions_and_delete_mode(container): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + response = container.query_items_change_feed(mode=change_feed_mode) + for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") + +def read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode from a specific continuation token. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(mode=change_feed_mode, continuation=continuation) + for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") + def run_sample(): client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY}) try: @@ -103,6 +148,17 @@ def run_sample(): read_change_feed(container) # Read Change Feed from timestamp read_change_feed_with_start_time(container, timestamp) + # Delete all items from container + delete_all_items(container) + # Read change feed with 'AllVersionsAndDeletes' mode + read_change_feed_with_all_versions_and_delete_mode(container) + continuation_token = container.client_connection.last_response_headers['etag'] + # Read change feed with 'AllVersionsAndDeletes' mode after create item + create_items(container, 10) + read_change_feed_with_all_versions_and_delete_mode_from_continuation(container,continuation_token) + # Read change feed with 'AllVersionsAndDeletes' mode after create/delete item + delete_all_items(container) + read_change_feed_with_all_versions_and_delete_mode_from_continuation(container,continuation_token) # cleanup database after sample try: diff --git a/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py b/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py index 9ea66d4ebcda..c8191d3e55f6 100644 --- a/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py +++ b/sdk/cosmos/azure-cosmos/samples/change_feed_management_async.py @@ -77,6 +77,50 @@ async def read_change_feed_with_start_time(container, start_time): print('\nFinished reading all the change feed from start time of {}\n'.format(time)) +async def read_change_feed_with_continuation(container, continuation): + print('\nReading change feed from continuation\n') + + # You can read change feed from a specific continuation token. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(continuation=continuation) + async for doc in response: + print(doc) + + print('\nFinished reading all the change feed from continuation\n') + +async def delete_all_items(container): + print('\nDeleting all item\n') + + async for item in container.query_items(query='SELECT * FROM c'): + # Deleting the current item + await container.delete_item(item, partition_key=item['address']['state']) + + print('Deleted all items') + +async def read_change_feed_with_all_versions_and_delete_mode(container): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + response = container.query_items_change_feed(mode=change_feed_mode) + async for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") + +async def read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation): + change_feed_mode = "AllVersionsAndDeletes" + print("\nReading change feed with 'AllVersionsAndDeletes' mode.\n") + + # You can read change feed with a specific change feed mode from a specific continuation token. + # You must pass in a valid change feed mode: ["LatestVersion", "AllVersionsAndDeletes"]. + # You must pass in a valid continuation token. + response = container.query_items_change_feed(mode=change_feed_mode, continuation=continuation) + async for doc in response: + print(doc) + + print("\nFinished reading all the change feed with 'AllVersionsAndDeletes' mode.\n") async def run_sample(): async with CosmosClient(HOST, MASTER_KEY) as client: @@ -108,6 +152,17 @@ async def run_sample(): await read_change_feed(container) # Read Change Feed from timestamp await read_change_feed_with_start_time(container, timestamp) + # Delete all items from container + await delete_all_items(container) + # Read change feed with 'AllVersionsAndDeletes' mode + await read_change_feed_with_all_versions_and_delete_mode(container) + continuation_token = container.client_connection.last_response_headers['etag'] + # Read change feed with 'AllVersionsAndDeletes' mode after create item + await create_items(container, 10) + await read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation_token) + # Read change feed with 'AllVersionsAndDeletes' mode after create/delete item + await delete_all_items(container) + await read_change_feed_with_all_versions_and_delete_mode_from_continuation(container, continuation_token) # cleanup database after sample try: diff --git a/sdk/cosmos/azure-cosmos/test/test_change_feed.py b/sdk/cosmos/azure-cosmos/test/test_change_feed.py index 456f8a7dbd5a..7c886e6dda5f 100644 --- a/sdk/cosmos/azure-cosmos/test/test_change_feed.py +++ b/sdk/cosmos/azure-cosmos/test/test_change_feed.py @@ -13,7 +13,16 @@ import azure.cosmos.exceptions as exceptions import test_config from azure.cosmos.partition_key import PartitionKey +from azure.cosmos._change_feed.change_feed_state import ChangeFeedStateV2 +ID = 'id' +CURRENT = 'current' +PREVIOUS = 'previous' +METADATA = 'metadata' +OPERATION_TYPE = 'operationType' +CREATE = 'create' +DELETE = 'delete' +E_TAG = 'etag' @pytest.fixture(scope="class") def setup(): @@ -26,9 +35,36 @@ def setup(): "tests.") test_client = cosmos_client.CosmosClient(config.host, config.masterKey), return { - "created_db": test_client[0].get_database_client(config.TEST_DATABASE_ID) + "created_db": test_client[0].get_database_client(config.TEST_DATABASE_ID), + "is_emulator": config.is_emulator } +def round_time(): + utc_now = datetime.now(timezone.utc) + return utc_now - timedelta(microseconds=utc_now.microsecond) + +def assert_change_feed(expected, actual): + if len(actual) == 0: + assert len(expected) == len(actual) + return + + #TODO: remove this if we can add flag to get 'previous' always + for item in actual: + if METADATA in item and item[METADATA][OPERATION_TYPE] == DELETE: + if ID in item[METADATA]: + item[PREVIOUS] = {ID: item[METADATA][ID]} + + # Sort actual by operation_type and id + actual = sorted(actual, key=lambda k: (k[METADATA][OPERATION_TYPE], k[CURRENT][ID]) if k[METADATA][OPERATION_TYPE] == CREATE else (k[METADATA][OPERATION_TYPE], k[PREVIOUS][ID])) + + for expected_change_feed, actual_change_feed in zip(expected, actual): + for expected_type, expected_data in expected_change_feed.items(): + assert expected_type in actual_change_feed + actual_data = actual_change_feed[expected_type] + for key, value in expected_data.items(): + assert key in actual_data + assert expected_data[key] == actual_data[key] + @pytest.mark.cosmosEmulator @pytest.mark.unittest @pytest.mark.usefixtures("setup") @@ -63,7 +99,7 @@ def test_query_change_feed_with_different_filter(self, change_feed_filter_param, filter_param = None # Read change feed from current should return an empty list - query_iterable = created_collection.query_items_change_feed(filter_param) + query_iterable = created_collection.query_items_change_feed(**filter_param) iter_list = list(query_iterable) assert len(iter_list) == 0 assert 'etag' in created_collection.client_connection.last_response_headers @@ -165,9 +201,6 @@ def test_query_change_feed_with_start_time(self, setup): PartitionKey(path="/pk")) batchSize = 50 - def round_time(): - utc_now = datetime.now(timezone.utc) - return utc_now - timedelta(microseconds=utc_now.microsecond) def create_random_items(container, batch_size): for _ in range(batch_size): # Generate a Random partition key @@ -218,14 +251,6 @@ def create_random_items(container, batch_size): # Should equal batch size assert totalCount == batchSize - # test an invalid value, Attribute error will be raised for passing non datetime object - invalid_time = "Invalid value" - try: - list(created_collection.query_items_change_feed(start_time=invalid_time)) - fail("Cannot format date on a non datetime object.") - except ValueError as e: #TODO: previously it is throwing AttributeError, now has changed into ValueError, is it breaking change? - assert "Invalid start_time 'Invalid value'" == e.args[0] - setup["created_db"].delete_container(created_collection.id) def test_query_change_feed_with_multi_partition(self, setup): @@ -252,5 +277,162 @@ def test_query_change_feed_with_multi_partition(self, setup): assert actual_ids == expected_ids + def test_query_change_feed_with_all_versions_and_deletes(self, setup): + partition_key = 'pk' + # 'retentionDuration' was required to enable `ALL_VERSIONS_AND_DELETES` for Emulator testing + change_feed_policy = {"retentionDuration": 10} if setup["is_emulator"] else None + created_collection = setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path=f"/{partition_key}"), + change_feed_policy=change_feed_policy) + mode = 'AllVersionsAndDeletes' + + ## Test Change Feed with empty collection(Save the continuation token) + query_iterable = created_collection.query_items_change_feed( + mode=mode, + ) + expected_change_feeds = [] + actual_change_feeds = list(query_iterable) + cont_token1 = created_collection.client_connection.last_response_headers[E_TAG] + assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created items from cont_token1 (Save the new continuation token) + new_documents = [{partition_key: f'pk{i}', ID: f'doc{i}'} for i in range(4)] + created_items = [] + for document in new_documents: + created_item = created_collection.create_item(body=document) + created_items.append(created_item) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)] + actual_change_feeds = list(query_iterable) + cont_token2 = created_collection.client_connection.last_response_headers['etag'] + assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for deleted items + for item in created_items: + created_collection.delete_item(item=item, partition_key=item['pk']) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token2, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in range(4)] + actual_change_feeds = list(query_iterable) + assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created/deleted items + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode = mode + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)]\ + + [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in range(4)] + actual_change_feeds = list(query_iterable) + assert_change_feed(expected_change_feeds, actual_change_feeds) + + def test_query_change_feed_with_errors(self, setup): + created_collection = setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path="/pk")) + mode = 'AllVersionsAndDeletes' + + # Error if invalid mode was used + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + mode="test_invalid_mode", + ) + assert str(e.value) == "Invalid mode was used: 'test_invalid_mode'. Supported modes are ['LatestVersion', 'AllVersionsAndDeletes']." + + # Error if partition_key_range_id was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + partition_key_range_id="TestPartitionKeyRangeId", + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id' was used. Please use 'feed_range' instead." + + # Error if is_start_from_beginning was in invalid type + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning="Now", + ) + assert str(e.value) == "'is_start_from_beginning' must be 'bool' type, but given 'str'." + + # Error if is_start_from_beginning was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning=True, + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning' is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead." + + # Error if 'is_start_from_beginning' was used with 'start_time' + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning=True, + start_time="Now", + ) + assert str(e.value) == "'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them." + + # Error if 'start_time' was invalid value + invalid_time = "Invalid value" + # TODO: previously it is throwing AttributeError, now has changed into ValueError, is it breaking change? + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'start_time' must be either 'Now' or 'Beginning', but given 'Invalid value'." + + # Error if 'start_time' was invalid type + invalid_time = 1.2 + with pytest.raises(AttributeError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'float' object has no attribute 'lower'" + + # Error if start_time was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + start_time=round_time(), + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'. Please use 'start_time=\"Now\"' or 'continuation' instead." + + # Error if too many positional arguments + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + "partition_key_range_id", + False, + "continuation", + 10, + "extra_argument", + ) + assert str(e.value) == "'query_items_change_feed()' takes 4 positional arguments but 5 were given." + + # Error if arguments are in both positional and keyword arguments list + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + "partition_key_range_id", + False, + "continuation", + 10, + continuation="123", + ) + assert str(e.value) == "'query_items_change_feed()' got multiple values for argument 'continuation'." + + # Error if continuation is missing 'mode' + with pytest.raises(ValueError) as e: + continuation_json = { + "containerRid": "", + "startFrom": {'Type': 'Now'}, + "continuation": {'Range': {'isMaxInclusive': False, 'isMinInclusive': True, 'max': 'FF', 'min': ''}, 'continuation': [{'range': {'isMaxInclusive': False, 'isMinInclusive': True, 'max': '1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF', 'min': ''}, 'token': '"1"'}, {'range': {'isMaxInclusive': False, 'isMinInclusive': True, 'max': 'FF', 'min': '1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF'}, 'token': '"1"'}], 'rid': 'ksMfAMrReEg=', 'v': 'v2'}, + } + ChangeFeedStateV2.from_continuation( + container_link="", + container_rid="", + continuation_json=continuation_json, + ) + assert str(e.value) == "Invalid continuation: [Missing mode]" + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py b/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py index 3e138f64e230..f690ca36aa1e 100644 --- a/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py +++ b/sdk/cosmos/azure-cosmos/test/test_change_feed_async.py @@ -15,6 +15,14 @@ from azure.cosmos.aio import CosmosClient from azure.cosmos.partition_key import PartitionKey +ID = 'id' +CURRENT = 'current' +PREVIOUS = 'previous' +METADATA = 'metadata' +OPERATION_TYPE = 'operationType' +CREATE = 'create' +DELETE = 'delete' +E_TAG = 'etag' @pytest_asyncio.fixture() async def setup(): @@ -27,12 +35,39 @@ async def setup(): test_client = CosmosClient(config.host, config.masterKey) created_db = await test_client.create_database_if_not_exists(config.TEST_DATABASE_ID) created_db_data = { - "created_db": created_db + "created_db": created_db, + "is_emulator": config.is_emulator } yield created_db_data await test_client.close() +def round_time(): + utc_now = datetime.now(timezone.utc) + return utc_now - timedelta(microseconds=utc_now.microsecond) + +async def assert_change_feed(expected, actual): + if len(actual) == 0: + assert len(expected) == len(actual) + return + + #TODO: remove this if we can add flag to get 'previous' always + for item in actual: + if METADATA in item and item[METADATA][OPERATION_TYPE] == DELETE: + if ID in item[METADATA]: + item[PREVIOUS] = {ID: item[METADATA][ID]} + + # Sort actual by operation_type and id + actual = sorted(actual, key=lambda k: (k[METADATA][OPERATION_TYPE], k[CURRENT][ID]) if k[METADATA][OPERATION_TYPE] == CREATE else (k[METADATA][OPERATION_TYPE], k[PREVIOUS][ID])) + + for expected_change_feed, actual_change_feed in zip(expected, actual): + for expected_type, expected_data in expected_change_feed.items(): + assert expected_type in actual_change_feed + actual_data = actual_change_feed[expected_type] + for key, value in expected_data.items(): + assert key in actual_data + assert expected_data[key] == actual_data[key] + @pytest.mark.cosmosEmulator @pytest.mark.asyncio @pytest.mark.usefixtures("setup") @@ -188,10 +223,6 @@ async def test_query_change_feed_with_start_time(self, setup): PartitionKey(path="/pk")) batchSize = 50 - def round_time(): - utc_now = datetime.now(timezone.utc) - return utc_now - timedelta(microseconds=utc_now.microsecond) - async def create_random_items(container, batch_size): for _ in range(batch_size): # Generate a Random partition key @@ -242,14 +273,6 @@ async def create_random_items(container, batch_size): # Should equal batch size assert totalCount == batchSize - # test an invalid value, Attribute error will be raised for passing non datetime object - invalid_time = "Invalid value" - try: - change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=invalid_time)] - fail("Cannot format date on a non datetime object.") - except ValueError as e: - assert ("Invalid start_time 'Invalid value'" == e.args[0]) - await setup["created_db"].delete_container(created_collection.id) async def test_query_change_feed_with_multi_partition_async(self, setup): @@ -276,5 +299,129 @@ async def test_query_change_feed_with_multi_partition_async(self, setup): assert actual_ids == expected_ids + async def test_query_change_feed_with_all_versions_and_deletes(self, setup): + partition_key = 'pk' + # 'retentionDuration' was required to enable `ALL_VERSIONS_AND_DELETES` for Emulator testing + change_feed_policy = {"retentionDuration": 10} if setup["is_emulator"] else None + created_collection = await setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path=f"/{partition_key}"), + change_feed_policy=change_feed_policy) + + mode = 'AllVersionsAndDeletes' + + ## Test Change Feed with empty collection(Save the continuation token) + query_iterable = created_collection.query_items_change_feed( + mode=mode, + ) + expected_change_feeds = [] + actual_change_feeds = [item async for item in query_iterable] + cont_token1 = created_collection.client_connection.last_response_headers[E_TAG] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created items from cont_token1 (Save the new continuation token) + new_documents = [{partition_key: f'pk{i}', ID: f'doc{i}'} for i in range(4)] + created_items = [] + for document in new_documents: + created_item = await created_collection.create_item(body=document) + created_items.append(created_item) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)] + actual_change_feeds = [item async for item in query_iterable] + cont_token2 = created_collection.client_connection.last_response_headers['etag'] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for deleted items + for item in created_items: + await created_collection.delete_item(item=item, partition_key=item['pk']) + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token2, + mode=mode, + ) + + expected_change_feeds = [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in + range(4)] + actual_change_feeds = [item async for item in query_iterable] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + ## Test change_feed for created/deleted items + query_iterable = created_collection.query_items_change_feed( + continuation=cont_token1, + mode=mode + ) + + expected_change_feeds = [{CURRENT: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: CREATE}} for i in range(4)] \ + + [{CURRENT: {}, PREVIOUS: {ID: f'doc{i}'}, METADATA: {OPERATION_TYPE: DELETE}} for i in + range(4)] + actual_change_feeds = [item async for item in query_iterable] + await assert_change_feed(expected_change_feeds, actual_change_feeds) + + async def test_query_change_feed_with_errors(self, setup): + created_collection = await setup["created_db"].create_container("change_feed_test_" + str(uuid.uuid4()), + PartitionKey(path="/pk")) + mode = 'AllVersionsAndDeletes' + + # Error if invalid mode was used + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + mode="test_invalid_mode", + ) + assert str(e.value) == "Invalid mode was used: 'test_invalid_mode'. Supported modes are ['LatestVersion', 'AllVersionsAndDeletes']." + + # Error if partition_key_range_id was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + partition_key_range_id="TestPartitionKeyRangeId", + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id' was used. Please use 'feed_range' instead." + + # Error if is_start_from_beginning was in invalid type + with pytest.raises(TypeError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning="Now", + ) + assert str(e.value) == "'is_start_from_beginning' must be 'bool' type, but given 'str'." + + # Error if is_start_from_beginning was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning="Now", + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning' is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead." + + # Error if 'is_start_from_beginning' was used with 'start_time' + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + is_start_from_beginning=True, + start_time="Now", + ) + assert str(e.value) == "'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them." + + # Error if 'start_time' was invalid value + invalid_time = "Invalid value" + # TODO: previously it is throwing AttributeError, now has changed into ValueError, is it breaking change? + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'start_time' must be either 'Now' or 'Beginning', but given 'Invalid value'." + + # Error if 'start_time' was invalid type + invalid_time = 1.2 + with pytest.raises(AttributeError) as e: + created_collection.query_items_change_feed(start_time=invalid_time) + assert str(e.value) == "'float' object has no attribute 'lower'" + + # Error if start_time was used with FULL_FIDELITY_FEED + with pytest.raises(ValueError) as e: + created_collection.query_items_change_feed( + start_time=round_time(), + mode=mode, + ) + assert str(e.value) == "'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'. Please use 'start_time=\"Now\"' or 'continuation' instead." + if __name__ == '__main__': unittest.main()