diff --git a/aws_lambda_powertools/utilities/idempotency/base.py b/aws_lambda_powertools/utilities/idempotency/base.py index 41fbd232ad3..ddd054daa14 100644 --- a/aws_lambda_powertools/utilities/idempotency/base.py +++ b/aws_lambda_powertools/utilities/idempotency/base.py @@ -1,3 +1,4 @@ +import datetime import logging from copy import deepcopy from typing import Any, Callable, Dict, Optional, Tuple @@ -73,6 +74,7 @@ def __init__( self.data = deepcopy(_prepare_data(function_payload)) self.fn_args = function_args self.fn_kwargs = function_kwargs + self.config = config persistence_store.configure(config, self.function.__name__) self.persistence_store = persistence_store @@ -101,7 +103,9 @@ def _process_idempotency(self): try: # We call save_inprogress first as an optimization for the most common case where no idempotent record # already exists. If it succeeds, there's no need to call get_record. - self.persistence_store.save_inprogress(data=self.data) + self.persistence_store.save_inprogress( + data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis() + ) except IdempotencyKeyError: raise except IdempotencyItemAlreadyExistsError: @@ -113,6 +117,25 @@ def _process_idempotency(self): return self._get_function_response() + def _get_remaining_time_in_millis(self) -> Optional[int]: + """ + Tries to determine the remaining time available for the current lambda invocation. + + This only works if the idempotent handler decorator is used, since we need to access the lambda context. + However, this could be improved if we start storing the lambda context globally during the invocation. One + way to do this is to register the lambda context when configuring the IdempotencyConfig object. + + Returns + ------- + Optional[int] + Remaining time in millis, or None if the remaining time cannot be determined. + """ + + if self.config.lambda_context is not None: + return self.config.lambda_context.get_remaining_time_in_millis() + + return None + def _get_idempotency_record(self) -> DataRecord: """ Retrieve the idempotency record from the persistence layer. @@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any] raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.") if data_record.status == STATUS_CONSTANTS["INPROGRESS"]: + if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int( + datetime.datetime.now().timestamp() * 1000 + ): + raise IdempotencyInconsistentStateError( + "item should have been expired in-progress because it already time-outed." + ) + raise IdempotencyAlreadyInProgressError( f"Execution already in progress with idempotency key: " f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}" diff --git a/aws_lambda_powertools/utilities/idempotency/config.py b/aws_lambda_powertools/utilities/idempotency/config.py index 06468cc74a7..e78f339fdc9 100644 --- a/aws_lambda_powertools/utilities/idempotency/config.py +++ b/aws_lambda_powertools/utilities/idempotency/config.py @@ -1,5 +1,7 @@ from typing import Dict, Optional +from aws_lambda_powertools.utilities.typing import LambdaContext + class IdempotencyConfig: def __init__( @@ -12,6 +14,7 @@ def __init__( use_local_cache: bool = False, local_cache_max_items: int = 256, hash_function: str = "md5", + lambda_context: Optional[LambdaContext] = None, ): """ Initialize the base persistence layer @@ -32,6 +35,8 @@ def __init__( Max number of items to store in local cache, by default 1024 hash_function: str, optional Function to use for calculating hashes, by default md5. + lambda_context: LambdaContext, optional + Lambda Context containing information about the invocation, function and execution environment. """ self.event_key_jmespath = event_key_jmespath self.payload_validation_jmespath = payload_validation_jmespath @@ -41,3 +46,8 @@ def __init__( self.use_local_cache = use_local_cache self.local_cache_max_items = local_cache_max_items self.hash_function = hash_function + self.lambda_context: Optional[LambdaContext] = lambda_context + + def register_lambda_context(self, lambda_context: LambdaContext): + """Captures the Lambda context, to calculate the remaining time before the invocation times out""" + self.lambda_context = lambda_context diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py index 4a7d8e71e1d..646fd68558f 100644 --- a/aws_lambda_powertools/utilities/idempotency/idempotency.py +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -62,6 +62,8 @@ def idempotent( return handler(event, context) config = config or IdempotencyConfig() + config.register_lambda_context(context) + args = event, context idempotency_handler = IdempotencyHandler( function=handler, diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py index e6ffea10de8..a87980d7fe0 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/base.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -40,6 +40,7 @@ def __init__( idempotency_key, status: str = "", expiry_timestamp: Optional[int] = None, + in_progress_expiry_timestamp: Optional[int] = None, response_data: Optional[str] = "", payload_hash: Optional[str] = None, ) -> None: @@ -53,6 +54,8 @@ def __init__( status of the idempotent record expiry_timestamp: int, optional time before the record should expire, in seconds + in_progress_expiry_timestamp: int, optional + time before the record should expire while in the INPROGRESS state, in seconds payload_hash: str, optional hashed representation of payload response_data: str, optional @@ -61,6 +64,7 @@ def __init__( self.idempotency_key = idempotency_key self.payload_hash = payload_hash self.expiry_timestamp = expiry_timestamp + self.in_progress_expiry_timestamp = in_progress_expiry_timestamp self._status = status self.response_data = response_data @@ -328,7 +332,7 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None: self._save_to_cache(data_record=data_record) - def save_inprogress(self, data: Dict[str, Any]) -> None: + def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None: """ Save record of function's execution being in progress @@ -336,6 +340,8 @@ def save_inprogress(self, data: Dict[str, Any]) -> None: ---------- data: Dict[str, Any] Payload + remaining_time_in_millis: Optional[int] + If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis """ data_record = DataRecord( idempotency_key=self._get_hashed_idempotency_key(data=data), @@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None: payload_hash=self._get_hashed_payload(data=data), ) + if remaining_time_in_millis: + now = datetime.datetime.now() + period = datetime.timedelta(milliseconds=remaining_time_in_millis) + timestamp = (now + period).timestamp() + + data_record.in_progress_expiry_timestamp = int(timestamp * 1000) + else: + warnings.warn( + "Couldn't determine the remaining time left. " + "Did you call register_lambda_context on IdempotencyConfig?" + ) + logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key): diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py index 88955738ecc..90cbd853e8a 100644 --- a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py +++ b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py @@ -12,7 +12,7 @@ IdempotencyItemAlreadyExistsError, IdempotencyItemNotFoundError, ) -from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord +from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, DataRecord logger = logging.getLogger(__name__) @@ -25,6 +25,7 @@ def __init__( static_pk_value: Optional[str] = None, sort_key_attr: Optional[str] = None, expiry_attr: str = "expiration", + in_progress_expiry_attr: str = "in_progress_expiration", status_attr: str = "status", data_attr: str = "data", validation_key_attr: str = "validation", @@ -47,6 +48,8 @@ def __init__( DynamoDB attribute name for the sort key expiry_attr: str, optional DynamoDB attribute name for expiry timestamp, by default "expiration" + in_progress_expiry_attr: str, optional + DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration" status_attr: str, optional DynamoDB attribute name for status, by default "status" data_attr: str, optional @@ -85,6 +88,7 @@ def __init__( self.static_pk_value = static_pk_value self.sort_key_attr = sort_key_attr self.expiry_attr = expiry_attr + self.in_progress_expiry_attr = in_progress_expiry_attr self.status_attr = status_attr self.data_attr = data_attr self.validation_key_attr = validation_key_attr @@ -133,6 +137,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: idempotency_key=item[self.key_attr], status=item[self.status_attr], expiry_timestamp=item[self.expiry_attr], + in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr), response_data=item.get(self.data_attr), payload_hash=item.get(self.validation_key_attr), ) @@ -153,17 +158,59 @@ def _put_record(self, data_record: DataRecord) -> None: self.status_attr: data_record.status, } + if data_record.in_progress_expiry_timestamp is not None: + item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp + if self.payload_validation_enabled: item[self.validation_key_attr] = data_record.payload_hash now = datetime.datetime.now() try: logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") + + # | LOCKED | RETRY if status = "INPROGRESS" | RETRY + # |----------------|-------------------------------------------------------|-------------> .... (time) + # | Lambda Idempotency Record + # | Timeout Timeout + # | (in_progress_expiry) (expiry) + + # Conditions to successfully save a record: + + # The idempotency key does not exist: + # - first time that this invocation key is used + # - previous invocation with the same key was deleted due to TTL + idempotency_key_not_exist = "attribute_not_exists(#id)" + + # The idempotency record exists but it's expired: + idempotency_expiry_expired = "#expiry < :now" + + # The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired + inprogress_expiry_expired = " AND ".join( + [ + "#status = :inprogress", + "attribute_exists(#in_progress_expiry)", + "#in_progress_expiry < :now_in_millis", + ] + ) + + condition_expression = ( + f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})" + ) + self.table.put_item( Item=item, - ConditionExpression="attribute_not_exists(#id) OR #now < :now", - ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr}, - ExpressionAttributeValues={":now": int(now.timestamp())}, + ConditionExpression=condition_expression, + ExpressionAttributeNames={ + "#id": self.key_attr, + "#expiry": self.expiry_attr, + "#in_progress_expiry": self.in_progress_expiry_attr, + "#status": self.status_attr, + }, + ExpressionAttributeValues={ + ":now": int(now.timestamp()), + ":now_in_millis": int(now.timestamp() * 1000), + ":inprogress": STATUS_CONSTANTS["INPROGRESS"], + }, ) except self.table.meta.client.exceptions.ConditionalCheckFailedException: logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") @@ -171,15 +218,15 @@ def _put_record(self, data_record: DataRecord) -> None: def _update_record(self, data_record: DataRecord): logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") - update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" + update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status" expression_attr_values = { ":expiry": data_record.expiry_timestamp, ":response_data": data_record.response_data, ":status": data_record.status, } expression_attr_names = { - "#response_data": self.data_attr, "#expiry": self.expiry_attr, + "#response_data": self.data_attr, "#status": self.status_attr, } diff --git a/docs/media/idempotent_sequence.png b/docs/media/idempotent_sequence.png deleted file mode 100644 index 92593184abb..00000000000 Binary files a/docs/media/idempotent_sequence.png and /dev/null differ diff --git a/docs/media/idempotent_sequence_exception.png b/docs/media/idempotent_sequence_exception.png deleted file mode 100644 index 4cf065993dd..00000000000 Binary files a/docs/media/idempotent_sequence_exception.png and /dev/null differ diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index a5ed14b9150..7ba61fd3062 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -22,6 +22,7 @@ times with the same parameters**. This makes idempotent operations safe to retry * Ensure Lambda handler returns the same result when called with the same payload * Select a subset of the event as the idempotency key using JMESPath expressions * Set a time window in which records with the same payload should be considered duplicates +* Expires in-progress executions if the Lambda function times out halfway through ## Getting started @@ -35,10 +36,10 @@ As of now, Amazon DynamoDB is the only supported persistent storage layer, so yo If you're not [changing the default configuration for the DynamoDB persistence layer](#dynamodbpersistencelayer), this is the expected default configuration: -Configuration | Value | Notes -------------------------------------------------- | ------------------------------------------------- | ------------------------------------------------- -Partition key | `id` | -TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console +| Configuration | Value | Notes | +| ------------------ | ------------ | ----------------------------------------------------------------------------------- | +| Partition key | `id` | +| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console | ???+ tip "Tip: You can share a single state table for all functions" You can reuse the same DynamoDB table to store idempotency state. We add your `function_name` in addition to the idempotency key as a hash key. @@ -161,6 +162,7 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo def lambda_handler(event, context): # `data` parameter must be called as a keyword argument to work dummy("hello", "universe", data="test") + config.register_lambda_context(context) # see Lambda timeouts section return processor.response() ``` @@ -197,7 +199,7 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo === "dataclass_sample.py" - ```python hl_lines="3-4 23 32" + ```python hl_lines="3-4 23 33" from dataclasses import dataclass from aws_lambda_powertools.utilities.idempotency import ( @@ -224,17 +226,18 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo def process_order(order: Order): return f"processed order {order.order_id}" + def lambda_handler(event, context): + config.register_lambda_context(context) # see Lambda timeouts section + order_item = OrderItem(sku="fake", description="sample") + order = Order(item=order_item, order_id="fake-id") - order_item = OrderItem(sku="fake", description="sample") - order = Order(item=order_item, order_id="fake-id") - - # `order` parameter must be called as a keyword argument to work - process_order(order=order) + # `order` parameter must be called as a keyword argument to work + process_order(order=order) ``` === "parser_pydantic_sample.py" - ```python hl_lines="1-2 22 31" + ```python hl_lines="1-2 22 32" from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function) from aws_lambda_powertools.utilities.parser import BaseModel @@ -260,12 +263,13 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo def process_order(order: Order): return f"processed order {order.order_id}" + def lambda_handler(event, context): + config.register_lambda_context(context) # see Lambda timeouts section + order_item = OrderItem(sku="fake", description="sample") + order = Order(item=order_item, order_id="fake-id") - order_item = OrderItem(sku="fake", description="sample") - order = Order(item=order_item, order_id="fake-id") - - # `order` parameter must be called as a keyword argument to work - process_order(order=order) + # `order` parameter must be called as a keyword argument to work + process_order(order=order) ``` ### Choosing a payload subset for idempotency @@ -354,19 +358,136 @@ Imagine the function executes successfully, but the client never receives the re This sequence diagram shows an example flow of what happens in the payment scenario: -![Idempotent sequence](../media/idempotent_sequence.png) +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + activate Persistence Layer + Note right of Persistence Layer: Locked to prevent concurrent
invocations with
the same payload. + Lambda-->>Lambda: Call handler (event) + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record with result + Lambda-->>Client: Response sent to client + else retried request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + Persistence Layer-->>Lambda: Already exists in persistence layer. Return result + Lambda-->>Client: Response sent to client + end +``` +Idempotent sequence +
The client was successful in receiving the result after the retry. Since the Lambda handler was only executed once, our customer hasn't been charged twice. ???+ note Bear in mind that the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can cause multiple side effects, consider splitting it into separate functions. +#### Lambda timeouts + +???+ note + This is automatically done when you decorate your Lambda handler with [@idempotent decorator](#idempotent-decorator). + +To prevent against extended failed retries when a [Lambda function times out](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/), Powertools calculates and includes the remaining invocation available time as part of the idempotency record. + +???+ example + If a second invocation happens **after** this timestamp, and the record is marked as `INPROGRESS`, we will execute the invocation again as if it was in the `EXPIRED` state (e.g, `expire_seconds` field elapsed). + + This means that if an invocation expired during execution, it will be quickly executed again on the next retry. + +???+ important + If you are only using the [@idempotent_function decorator](#idempotentfunction-decorator) to guard isolated parts of your code, you must use `register_lambda_context` available in the [idempotency config object](#customizing-the-default-behavior) to benefit from this protection. + +Here is an example on how you register the Lambda context in your handler: + +```python hl_lines="8 16" title="Registering the Lambda context" +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.idempotency import ( + IdempotencyConfig, idempotent_function +) + +persistence_layer = DynamoDBPersistenceLayer(table_name="...") + +config = IdempotencyConfig() + +@idempotent_function(data_keyword_argument="record", persistence_store=persistence_layer, config=config) +def record_handler(record: SQSRecord): + return {"message": record["body"]} + + +def lambda_handler(event, context): + config.register_lambda_context(context) + + return record_handler(event) +``` + +#### Lambda timeout sequence diagram + +This sequence diagram shows an example flow of what happens if a Lambda function times out: + +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + alt initial request + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + activate Persistence Layer + Note right of Persistence Layer: Locked to prevent concurrent
invocations with
the same payload. + Note over Lambda: Time out + Lambda--xLambda: Call handler (event) + Lambda-->>Client: Return error response + deactivate Persistence Layer + else concurrent request before timeout + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + Persistence Layer-->>Lambda: Request already INPROGRESS + Lambda--xClient: Return IdempotencyAlreadyInProgressError + else retry after Lambda timeout + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + activate Persistence Layer + Note right of Persistence Layer: Locked to prevent concurrent
invocations with
the same payload. + Lambda-->>Lambda: Call handler (event) + Lambda->>Persistence Layer: Update record with result + deactivate Persistence Layer + Persistence Layer-->>Persistence Layer: Update record with result + Lambda-->>Client: Response sent to client + end +``` +Idempotent sequence for Lambda timeouts +
+ ### Handling exceptions If you are using the `idempotent` decorator on your Lambda handler, any unhandled exceptions that are raised during the code execution will cause **the record in the persistence layer to be deleted**. This means that new invocations will execute your code again despite having the same payload. If you don't want the record to be deleted, you need to catch exceptions within the idempotent function and return a successful response. -![Idempotent sequence exception](../media/idempotent_sequence_exception.png) +
+```mermaid +sequenceDiagram + participant Client + participant Lambda + participant Persistence Layer + Client->>Lambda: Invoke (event) + Lambda->>Persistence Layer: Get or set (id=event.search(payload)) + activate Persistence Layer + Note right of Persistence Layer: Locked during this time. Prevents multiple
Lambda invocations with the same
payload running concurrently. + Lambda--xLambda: Call handler (event).
Raises exception + Lambda->>Persistence Layer: Delete record (id=event.search(payload)) + deactivate Persistence Layer + Lambda-->>Client: Return error response +``` +Idempotent sequence exception +
If you are using `idempotent_function`, any unhandled exceptions that are raised _inside_ the decorated function will cause the record in the persistence layer to be deleted, and allow the function to be executed again if retried. @@ -402,13 +523,14 @@ def call_external_service(data: dict, **kwargs): This persistence layer is built-in, and you can either use an existing DynamoDB table or create a new one dedicated for idempotency state (recommended). -```python hl_lines="5-9" title="Customizing DynamoDBPersistenceLayer to suit your table structure" +```python hl_lines="5-10" title="Customizing DynamoDBPersistenceLayer to suit your table structure" from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer persistence_layer = DynamoDBPersistenceLayer( table_name="IdempotencyTable", key_attr="idempotency_key", expiry_attr="expires_at", + in_progress_expiry_attr="in_progress_expires_at", status_attr="current_status", data_attr="result_data", validation_key_attr="validation_key", @@ -417,16 +539,17 @@ persistence_layer = DynamoDBPersistenceLayer( When using DynamoDB as a persistence layer, you can alter the attribute names by passing these parameters when initializing the persistence layer: -Parameter | Required | Default | Description -------------------------------------------------- | ------------------------------------------------- | ------------------------------------------------- | --------------------------------------------------------------------------------- -**table_name** | :heavy_check_mark: | | Table name to store state -**key_attr** | | `id` | Partition key of the table. Hashed representation of the payload (unless **sort_key_attr** is specified) -**expiry_attr** | | `expiration` | Unix timestamp of when record expires -**status_attr** | | `status` | Stores status of the lambda execution during and after invocation -**data_attr** | | `data` | Stores results of successfully executed Lambda handlers -**validation_key_attr** | | `validation` | Hashed representation of the parts of the event used for validation -**sort_key_attr** | | | Sort key of the table (if table is configured with a sort key). -**static_pk_value** | | `idempotency#{LAMBDA_FUNCTION_NAME}` | Static value to use as the partition key. Only used when **sort_key_attr** is set. +| Parameter | Required | Default | Description | +| --------------------------- | ------------------ | ------------------------------------ | -------------------------------------------------------------------------------------------------------- | +| **table_name** | :heavy_check_mark: | | Table name to store state | +| **key_attr** | | `id` | Partition key of the table. Hashed representation of the payload (unless **sort_key_attr** is specified) | +| **expiry_attr** | | `expiration` | Unix timestamp of when record expires | +| **in_progress_expiry_attr** | | `in_progress_expiration` | Unix timestamp of when record expires while in progress (in case of the invocation times out) | +| **status_attr** | | `status` | Stores status of the lambda execution during and after invocation | +| **data_attr** | | `data` | Stores results of successfully executed Lambda handlers | +| **validation_key_attr** | | `validation` | Hashed representation of the parts of the event used for validation | +| **sort_key_attr** | | | Sort key of the table (if table is configured with a sort key). | +| **static_pk_value** | | `idempotency#{LAMBDA_FUNCTION_NAME}` | Static value to use as the partition key. Only used when **sort_key_attr** is set. | ## Advanced @@ -434,15 +557,15 @@ Parameter | Required | Default | Description Idempotent decorator can be further configured with **`IdempotencyConfig`** as seen in the previous example. These are the available options for further configuration -Parameter | Default | Description -------------------------------------------------- | ------------------------------------------------- | --------------------------------------------------------------------------------- -**event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](/utilities/jmespath_functions) -**payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload -**raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request -**expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired -**use_local_cache** | `False` | Whether to locally cache idempotency results -**local_cache_max_items** | 256 | Max number of items to store in local cache -**hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html) in the standard library. +| Parameter | Default | Description | +| ------------------------------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------------- | +| **event_key_jmespath** | `""` | JMESPath expression to extract the idempotency key from the event record using [built-in functions](/utilities/jmespath_functions) | +| **payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload | +| **raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request | +| **expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired | +| **use_local_cache** | `False` | Whether to locally cache idempotency results | +| **local_cache_max_items** | 256 | Max number of items to store in local cache | +| **hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html) in the standard library. | ### Handling concurrent executions with the same payload @@ -706,11 +829,11 @@ def handler(event, context): The example function above would cause data to be stored in DynamoDB like this: -| id | sort_key | expiration | status | data | -|------------------------------|----------------------------------|------------|-------------|-------------------------------------| -| idempotency#MyLambdaFunction | 1e956ef7da78d0cb890be999aecc0c9e | 1636549553 | COMPLETED | {"id": 12391, "message": "success"} | -| idempotency#MyLambdaFunction | 2b2cdb5f86361e97b4383087c1ffdf27 | 1636549571 | COMPLETED | {"id": 527212, "message": "success"}| -| idempotency#MyLambdaFunction | f091d2527ad1c78f05d54cc3f363be80 | 1636549585 | IN_PROGRESS | | +| id | sort_key | expiration | status | data | +| ---------------------------- | -------------------------------- | ---------- | ----------- | ------------------------------------ | +| idempotency#MyLambdaFunction | 1e956ef7da78d0cb890be999aecc0c9e | 1636549553 | COMPLETED | {"id": 12391, "message": "success"} | +| idempotency#MyLambdaFunction | 2b2cdb5f86361e97b4383087c1ffdf27 | 1636549571 | COMPLETED | {"id": 527212, "message": "success"} | +| idempotency#MyLambdaFunction | f091d2527ad1c78f05d54cc3f363be80 | 1636549585 | IN_PROGRESS | | ### Bring your own persistent store diff --git a/tests/functional/event_handler/test_api_gateway.py b/tests/functional/event_handler/test_api_gateway.py index f1fb6a1f942..4b1d7c1ee32 100644 --- a/tests/functional/event_handler/test_api_gateway.py +++ b/tests/functional/event_handler/test_api_gateway.py @@ -283,7 +283,7 @@ def test_base64_encode(): @app.get("/my/path", compress=True) def read_image() -> Response: - return Response(200, "image/png", read_media("idempotent_sequence_exception.png")) + return Response(200, "image/png", read_media("tracer_utility_showcase.png")) # WHEN calling the event handler result = app(mock_event, None) diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py index 74deecef123..b5cf79727b1 100644 --- a/tests/functional/idempotency/conftest.py +++ b/tests/functional/idempotency/conftest.py @@ -1,6 +1,5 @@ import datetime import json -from collections import namedtuple from decimal import Decimal from unittest import mock @@ -32,14 +31,17 @@ def lambda_apigw_event(): @pytest.fixture def lambda_context(): - lambda_context = { - "function_name": "test-func", - "memory_limit_in_mb": 128, - "invoked_function_arn": "arn:aws:lambda:eu-west-1:809313241234:function:test-func", - "aws_request_id": "52fdfc07-2182-154f-163f-5f0f9a621d72", - } + class LambdaContext: + def __init__(self): + self.function_name = "test-func" + self.memory_limit_in_mb = 128 + self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func" + self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + def get_remaining_time_in_millis(self) -> int: + return 1000 - return namedtuple("LambdaContext", lambda_context.keys())(*lambda_context.values()) + return LambdaContext() @pytest.fixture @@ -77,7 +79,11 @@ def default_jmespath(): @pytest.fixture def expected_params_update_item(serialized_lambda_response, hashed_idempotency_key): return { - "ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"}, + "ExpressionAttributeNames": { + "#expiry": "expiration", + "#response_data": "data", + "#status": "status", + }, "ExpressionAttributeValues": { ":expiry": stub.ANY, ":response_data": serialized_lambda_response, @@ -108,19 +114,34 @@ def expected_params_update_item_with_validation( }, "Key": {"id": hashed_idempotency_key}, "TableName": "TEST_TABLE", - "UpdateExpression": "SET #response_data = :response_data, " - "#expiry = :expiry, #status = :status, " - "#validation_key = :validation_key", + "UpdateExpression": ( + "SET #response_data = :response_data, " + "#expiry = :expiry, #status = :status, " + "#validation_key = :validation_key" + ), } @pytest.fixture def expected_params_put_item(hashed_idempotency_key): return { - "ConditionExpression": "attribute_not_exists(#id) OR #now < :now", - "ExpressionAttributeNames": {"#id": "id", "#now": "expiration"}, - "ExpressionAttributeValues": {":now": stub.ANY}, - "Item": {"expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS"}, + "ConditionExpression": ( + "attribute_not_exists(#id) OR #expiry < :now OR " + "(#status = :inprogress AND attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_in_millis)" + ), + "ExpressionAttributeNames": { + "#id": "id", + "#expiry": "expiration", + "#status": "status", + "#in_progress_expiry": "in_progress_expiration", + }, + "ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"}, + "Item": { + "expiration": stub.ANY, + "id": hashed_idempotency_key, + "status": "INPROGRESS", + "in_progress_expiration": stub.ANY, + }, "TableName": "TEST_TABLE", } @@ -128,11 +149,20 @@ def expected_params_put_item(hashed_idempotency_key): @pytest.fixture def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_validation_key): return { - "ConditionExpression": "attribute_not_exists(#id) OR #now < :now", - "ExpressionAttributeNames": {"#id": "id", "#now": "expiration"}, - "ExpressionAttributeValues": {":now": stub.ANY}, + "ConditionExpression": ( + "attribute_not_exists(#id) OR #expiry < :now OR " + "(#status = :inprogress AND attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_in_millis)" + ), + "ExpressionAttributeNames": { + "#id": "id", + "#expiry": "expiration", + "#status": "status", + "#in_progress_expiry": "in_progress_expiration", + }, + "ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"}, "Item": { "expiration": stub.ANY, + "in_progress_expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS", "validation": hashed_validation_key, @@ -176,6 +206,7 @@ def idempotency_config(config, request, default_jmespath): return IdempotencyConfig( event_key_jmespath=request.param.get("event_key_jmespath") or default_jmespath, use_local_cache=request.param["use_local_cache"], + payload_validation_jmespath=request.param.get("payload_validation_jmespath") or "", ) @@ -184,15 +215,6 @@ def config_without_jmespath(config, request): return IdempotencyConfig(use_local_cache=request.param["use_local_cache"]) -@pytest.fixture -def config_with_validation(config, request, default_jmespath): - return IdempotencyConfig( - event_key_jmespath=default_jmespath, - use_local_cache=request.param, - payload_validation_jmespath="requestContext", - ) - - @pytest.fixture def config_with_jmespath_options(config, request): class CustomFunctions(functions.Functions): diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py index 40cee10e4f7..97a9166efa0 100644 --- a/tests/functional/idempotency/test_idempotency.py +++ b/tests/functional/idempotency/test_idempotency.py @@ -1,5 +1,7 @@ import copy +import datetime import sys +import warnings from hashlib import md5 from unittest.mock import MagicMock @@ -10,7 +12,7 @@ from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig -from aws_lambda_powertools.utilities.idempotency.base import _prepare_data +from aws_lambda_powertools.utilities.idempotency.base import MAX_RETRIES, IdempotencyHandler, _prepare_data from aws_lambda_powertools.utilities.idempotency.exceptions import ( IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError, @@ -208,9 +210,6 @@ def test_idempotent_lambda_first_execution( expected_params_update_item, expected_params_put_item, lambda_response, - serialized_lambda_response, - deserialized_lambda_response, - hashed_idempotency_key, lambda_context, ): """ @@ -295,7 +294,11 @@ def test_idempotent_lambda_first_execution_event_mutation( event = copy.deepcopy(lambda_apigw_event) stubber = stub.Stubber(persistence_store.table.meta.client) ddb_response = {} - stubber.add_response("put_item", ddb_response, build_idempotency_put_item_stub(data=event["body"])) + stubber.add_response( + "put_item", + ddb_response, + build_idempotency_put_item_stub(data=event["body"]), + ) stubber.add_response( "update_item", ddb_response, @@ -319,15 +322,13 @@ def test_idempotent_lambda_expired( idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, - timestamp_expired, lambda_response, expected_params_update_item, expected_params_put_item, - hashed_idempotency_key, lambda_context, ): """ - Test idempotent decorator when lambda is called with an event it succesfully handled already, but outside of the + Test idempotent decorator when lambda is called with an event it successfully handled already, but outside of the expiry window """ @@ -354,8 +355,6 @@ def test_idempotent_lambda_exception( idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, - timestamp_future, - lambda_response, hashed_idempotency_key, expected_params_put_item, lambda_context, @@ -389,10 +388,15 @@ def lambda_handler(event, context): @pytest.mark.parametrize( - "config_with_validation", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True + "idempotency_config", + [ + {"use_local_cache": False, "payload_validation_jmespath": "requestContext"}, + {"use_local_cache": True, "payload_validation_jmespath": "requestContext"}, + ], + indirect=True, ) def test_idempotent_lambda_already_completed_with_validation_bad_payload( - config_with_validation: IdempotencyConfig, + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, timestamp_future, @@ -422,7 +426,7 @@ def test_idempotent_lambda_already_completed_with_validation_bad_payload( stubber.add_response("get_item", ddb_response, expected_params) stubber.activate() - @idempotent(config=config_with_validation, persistence_store=persistence_store) + @idempotent(config=idempotency_config, persistence_store=persistence_store) def lambda_handler(event, context): return lambda_response @@ -445,7 +449,7 @@ def test_idempotent_lambda_expired_during_request( lambda_context, ): """ - Test idempotent decorator when lambda is called with an event it succesfully handled already. Persistence store + Test idempotent decorator when lambda is called with an event it successfully handled already. Persistence store returns inconsistent/rapidly changing result between put_item and get_item calls. """ @@ -495,9 +499,6 @@ def test_idempotent_persistence_exception_deleting( idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, - timestamp_future, - lambda_response, - hashed_idempotency_key, expected_params_put_item, lambda_context, ): @@ -530,9 +531,6 @@ def test_idempotent_persistence_exception_updating( idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, - timestamp_future, - lambda_response, - hashed_idempotency_key, expected_params_put_item, lambda_context, ): @@ -565,10 +563,6 @@ def test_idempotent_persistence_exception_getting( idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, - timestamp_future, - lambda_response, - hashed_idempotency_key, - expected_params_put_item, lambda_context, ): """ @@ -594,17 +588,20 @@ def lambda_handler(event, context): @pytest.mark.parametrize( - "config_with_validation", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True + "idempotency_config", + [ + {"use_local_cache": False, "payload_validation_jmespath": "requestContext"}, + {"use_local_cache": True, "payload_validation_jmespath": "requestContext"}, + ], + indirect=True, ) def test_idempotent_lambda_first_execution_with_validation( - config_with_validation: IdempotencyConfig, + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_apigw_event, expected_params_update_item_with_validation, expected_params_put_item_with_validation, lambda_response, - hashed_idempotency_key, - hashed_validation_key, lambda_context, ): """ @@ -617,7 +614,7 @@ def test_idempotent_lambda_first_execution_with_validation( stubber.add_response("update_item", ddb_response, expected_params_update_item_with_validation) stubber.activate() - @idempotent(config=config_with_validation, persistence_store=persistence_store) + @idempotent(config=idempotency_config, persistence_store=persistence_store) def lambda_handler(event, context): return lambda_response @@ -679,6 +676,118 @@ def lambda_handler(event, context): stubber.deactivate() +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_expires_in_progress_before_expire( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + lambda_context, +): + stubber = stub.Stubber(persistence_store.table.meta.client) + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + + now = datetime.datetime.now() + period = datetime.timedelta(seconds=5) + timestamp_expires_in_progress = int((now + period).timestamp() * 1000) + + expected_params_get_item = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key}, + "ConsistentRead": True, + } + ddb_response_get_item = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "in_progress_expiration": {"N": str(timestamp_expires_in_progress)}, + "data": {"S": '{"message": "test", "statusCode": 200'}, + "status": {"S": "INPROGRESS"}, + } + } + stubber.add_response("get_item", ddb_response_get_item, expected_params_get_item) + + stubber.activate() + + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + with pytest.raises(IdempotencyAlreadyInProgressError): + lambda_handler(lambda_apigw_event, lambda_context) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_expires_in_progress_after_expire( + idempotency_config: IdempotencyConfig, + persistence_store: DynamoDBPersistenceLayer, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + lambda_context, +): + stubber = stub.Stubber(persistence_store.table.meta.client) + + for _ in range(MAX_RETRIES + 1): + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + + one_second_ago = datetime.datetime.now() - datetime.timedelta(seconds=1) + expected_params_get_item = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key}, + "ConsistentRead": True, + } + ddb_response_get_item = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "in_progress_expiration": {"N": str(int(one_second_ago.timestamp() * 1000))}, + "data": {"S": '{"message": "test", "statusCode": 200'}, + "status": {"S": "INPROGRESS"}, + } + } + stubber.add_response("get_item", ddb_response_get_item, expected_params_get_item) + + stubber.activate() + + @idempotent(config=idempotency_config, persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + with pytest.raises(IdempotencyInconsistentStateError): + lambda_handler(lambda_apigw_event, lambda_context) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +def test_idempotent_lambda_expires_in_progress_unavailable_remaining_time(): + mock_event = {"data": "value"} + idempotency_key = "test-func.function#" + hash_idempotency_key(mock_event) + persistence_layer = MockPersistenceLayer(expected_idempotency_key=idempotency_key) + expected_result = {"message": "Foo"} + + @idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record") + def function(record): + return expected_result + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("default") + function(record=mock_event) + assert len(w) == 1 + assert ( + str(w[-1].message) + == "Couldn't determine the remaining time left. Did you call register_lambda_context on IdempotencyConfig?" + ) + + def test_data_record_invalid_status_value(): data_record = DataRecord("key", status="UNSUPPORTED_STATUS") with pytest.raises(IdempotencyInvalidStatusError) as e: @@ -710,6 +819,62 @@ def test_data_record_json_to_dict_mapping_when_response_data_none(): assert response_data is None +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True) +def test_handler_for_status_expired_data_record( + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer +): + idempotency_handler = IdempotencyHandler( + function=lambda a: a, + function_payload={}, + config=idempotency_config, + persistence_store=persistence_store, + ) + data_record = DataRecord("key", status="EXPIRED", response_data=None) + + with pytest.raises(IdempotencyInconsistentStateError): + idempotency_handler._handle_for_status(data_record) + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True) +def test_handler_for_status_inprogress_data_record_inconsistent( + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer +): + idempotency_handler = IdempotencyHandler( + function=lambda a: a, + function_payload={}, + config=idempotency_config, + persistence_store=persistence_store, + ) + + now = datetime.datetime.now() + period = datetime.timedelta(milliseconds=100) + timestamp = int((now - period).timestamp() * 1000) + data_record = DataRecord("key", in_progress_expiry_timestamp=timestamp, status="INPROGRESS", response_data=None) + + with pytest.raises(IdempotencyInconsistentStateError): + idempotency_handler._handle_for_status(data_record) + + +@pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True) +def test_handler_for_status_inprogress_data_record_consistent( + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer +): + idempotency_handler = IdempotencyHandler( + function=lambda a: a, + function_payload={}, + config=idempotency_config, + persistence_store=persistence_store, + ) + + now = datetime.datetime.now() + period = datetime.timedelta(milliseconds=100) + timestamp = int((now + period).timestamp() * 1000) + data_record = DataRecord("key", in_progress_expiry_timestamp=timestamp, status="INPROGRESS", response_data=None) + + with pytest.raises(IdempotencyAlreadyInProgressError): + idempotency_handler._handle_for_status(data_record) + + @pytest.mark.parametrize("idempotency_config", [{"use_local_cache": True}], indirect=True) def test_in_progress_never_saved_to_cache( idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer @@ -800,7 +965,7 @@ def test_is_missing_idempotency_key(): "idempotency_config", [{"use_local_cache": False, "event_key_jmespath": "body"}], indirect=True ) def test_default_no_raise_on_missing_idempotency_key( - idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer ): # GIVEN a persistence_store with use_local_cache = False and event_key_jmespath = "body" function_name = "foo" @@ -817,10 +982,14 @@ def test_default_no_raise_on_missing_idempotency_key( @pytest.mark.parametrize( - "idempotency_config", [{"use_local_cache": False, "event_key_jmespath": "[body, x]"}], indirect=True + "idempotency_config", + [ + {"use_local_cache": False, "event_key_jmespath": "[body, x]"}, + ], + indirect=True, ) def test_raise_on_no_idempotency_key( - idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer ): # GIVEN a persistence_store with raise_on_no_idempotency_key and no idempotency key in the request persistence_store.configure(idempotency_config) @@ -842,12 +1011,12 @@ def test_raise_on_no_idempotency_key( { "use_local_cache": False, "event_key_jmespath": "[requestContext.authorizer.claims.sub, powertools_json(body).id]", - } + }, ], indirect=True, ) def test_jmespath_with_powertools_json( - idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context + idempotency_config: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer ): # GIVEN an event_key_jmespath with powertools_json custom function persistence_store.configure(idempotency_config, "handler") @@ -868,7 +1037,7 @@ def test_jmespath_with_powertools_json( @pytest.mark.parametrize("config_with_jmespath_options", ["powertools_json(data).payload"], indirect=True) def test_custom_jmespath_function_overrides_builtin_functions( - config_with_jmespath_options: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer, lambda_context + config_with_jmespath_options: IdempotencyConfig, persistence_store: DynamoDBPersistenceLayer ): # GIVEN a persistence store with a custom jmespath_options # AND use a builtin powertools custom function diff --git a/tests/functional/idempotency/utils.py b/tests/functional/idempotency/utils.py index ca3862a2d8c..797b696aba4 100644 --- a/tests/functional/idempotency/utils.py +++ b/tests/functional/idempotency/utils.py @@ -12,14 +12,29 @@ def hash_idempotency_key(data: Any): def build_idempotency_put_item_stub( - data: Dict, function_name: str = "test-func", handler_name: str = "lambda_handler" + data: Dict, + function_name: str = "test-func", + handler_name: str = "lambda_handler", ) -> Dict: idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}" return { - "ConditionExpression": "attribute_not_exists(#id) OR #now < :now", - "ExpressionAttributeNames": {"#id": "id", "#now": "expiration"}, - "ExpressionAttributeValues": {":now": stub.ANY}, - "Item": {"expiration": stub.ANY, "id": idempotency_key_hash, "status": "INPROGRESS"}, + "ConditionExpression": ( + "attribute_not_exists(#id) OR #expiry < :now OR " + "(#status = :inprogress AND attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_in_millis)" + ), + "ExpressionAttributeNames": { + "#id": "id", + "#expiry": "expiration", + "#status": "status", + "#in_progress_expiry": "in_progress_expiration", + }, + "ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"}, + "Item": { + "expiration": stub.ANY, + "id": idempotency_key_hash, + "status": "INPROGRESS", + "in_progress_expiration": stub.ANY, + }, "TableName": "TEST_TABLE", } @@ -33,7 +48,11 @@ def build_idempotency_update_item_stub( idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}" serialized_lambda_response = json_serialize(handler_response) return { - "ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"}, + "ExpressionAttributeNames": { + "#expiry": "expiration", + "#response_data": "data", + "#status": "status", + }, "ExpressionAttributeValues": { ":expiry": stub.ANY, ":response_data": serialized_lambda_response,