diff --git a/aws_lambda_powertools/shared/cache_dict.py b/aws_lambda_powertools/shared/cache_dict.py new file mode 100644 index 00000000000..d7184cc1e2b --- /dev/null +++ b/aws_lambda_powertools/shared/cache_dict.py @@ -0,0 +1,31 @@ +from collections import OrderedDict + + +class LRUDict(OrderedDict): + """ + Cache implementation based on ordered dict with a maximum number of items. Last accessed item will be evicted + first. Currently used by idempotency utility. + """ + + def __init__(self, max_items=1024, *args, **kwargs): + self.max_items = max_items + super().__init__(*args, **kwargs) + + def __getitem__(self, key): + value = super().__getitem__(key) + self.move_to_end(key) + return value + + def __setitem__(self, key, value): + if key in self: + self.move_to_end(key) + super().__setitem__(key, value) + if len(self) > self.max_items: + oldest = next(iter(self)) + del self[oldest] + + def get(self, key, *args, **kwargs): + item = super(LRUDict, self).get(key, *args, **kwargs) + if item: + self.move_to_end(key=key) + return item diff --git a/aws_lambda_powertools/shared/json_encoder.py b/aws_lambda_powertools/shared/json_encoder.py new file mode 100644 index 00000000000..32a094abd85 --- /dev/null +++ b/aws_lambda_powertools/shared/json_encoder.py @@ -0,0 +1,16 @@ +import decimal +import json +import math + + +class Encoder(json.JSONEncoder): + """ + Custom JSON encoder to allow for serialization of Decimals, similar to the serializer used by Lambda internally. + """ + + def default(self, obj): + if isinstance(obj, decimal.Decimal): + if obj.is_nan(): + return math.nan + return str(obj) + return super().default(obj) diff --git a/aws_lambda_powertools/utilities/idempotency/__init__.py b/aws_lambda_powertools/utilities/idempotency/__init__.py new file mode 100644 index 00000000000..98e2be15415 --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/__init__.py @@ -0,0 +1,10 @@ +""" +Utility for adding idempotency to lambda functions +""" + +from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer +from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer + +from .idempotency import idempotent + +__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent") diff --git a/aws_lambda_powertools/utilities/idempotency/exceptions.py b/aws_lambda_powertools/utilities/idempotency/exceptions.py new file mode 100644 index 00000000000..1d7a8acab1f --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/exceptions.py @@ -0,0 +1,45 @@ +""" +Idempotency errors +""" + + +class IdempotencyItemAlreadyExistsError(Exception): + """ + Item attempting to be inserted into persistence store already exists and is not expired + """ + + +class IdempotencyItemNotFoundError(Exception): + """ + Item does not exist in persistence store + """ + + +class IdempotencyAlreadyInProgressError(Exception): + """ + Execution with idempotency key is already in progress + """ + + +class IdempotencyInvalidStatusError(Exception): + """ + An invalid status was provided + """ + + +class IdempotencyValidationError(Exception): + """ + Payload does not match stored idempotency record + """ + + +class IdempotencyInconsistentStateError(Exception): + """ + State is inconsistent across multiple requests to persistence store + """ + + +class IdempotencyPersistenceLayerError(Exception): + """ + Unrecoverable error from the data store + """ diff --git a/aws_lambda_powertools/utilities/idempotency/idempotency.py b/aws_lambda_powertools/utilities/idempotency/idempotency.py new file mode 100644 index 00000000000..bc556f49912 --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/idempotency.py @@ -0,0 +1,221 @@ +""" +Primary interface for idempotent Lambda functions utility +""" +import logging +from typing import Any, Callable, Dict, Optional + +from aws_lambda_powertools.middleware_factory import lambda_handler_decorator +from aws_lambda_powertools.utilities.idempotency.exceptions import ( + IdempotencyAlreadyInProgressError, + IdempotencyInconsistentStateError, + IdempotencyItemAlreadyExistsError, + IdempotencyItemNotFoundError, + IdempotencyPersistenceLayerError, + IdempotencyValidationError, +) +from aws_lambda_powertools.utilities.idempotency.persistence.base import ( + STATUS_CONSTANTS, + BasePersistenceLayer, + DataRecord, +) +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = logging.getLogger(__name__) + + +@lambda_handler_decorator +def idempotent( + handler: Callable[[Any, LambdaContext], Any], + event: Dict[str, Any], + context: LambdaContext, + persistence_store: BasePersistenceLayer, +) -> Any: + """ + Middleware to handle idempotency + + Parameters + ---------- + handler: Callable + Lambda's handler + event: Dict + Lambda's Event + context: Dict + Lambda's Context + persistence_store: BasePersistenceLayer + Instance of BasePersistenceLayer to store data + + Examples + -------- + **Processes Lambda's event in an idempotent manner** + >>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer + >>> + >>> persistence_store = DynamoDBPersistenceLayer(event_key_jmespath="body", table_name="idempotency_store") + >>> + >>> @idempotent(persistence_store=persistence_store) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + """ + + idempotency_handler = IdempotencyHandler(handler, event, context, persistence_store) + + # IdempotencyInconsistentStateError can happen under rare but expected cases when persistent state changes in the + # small time between put & get requests. In most cases we can retry successfully on this exception. + max_handler_retries = 2 + for i in range(max_handler_retries + 1): + try: + return idempotency_handler.handle() + except IdempotencyInconsistentStateError: + if i < max_handler_retries: + continue + else: + # Allow the exception to bubble up after max retries exceeded + raise + + +class IdempotencyHandler: + """ + Class to orchestrate calls to persistence layer. + """ + + def __init__( + self, + lambda_handler: Callable[[Any, LambdaContext], Any], + event: Dict[str, Any], + context: LambdaContext, + persistence_store: BasePersistenceLayer, + ): + """ + Initialize the IdempotencyHandler + + Parameters + ---------- + lambda_handler : Callable[[Any, LambdaContext], Any] + Lambda function handler + event : Dict[str, Any] + Event payload lambda handler will be called with + context : LambdaContext + Context object which will be passed to lambda handler + persistence_store : BasePersistenceLayer + Instance of persistence layer to store idempotency records + """ + self.persistence_store = persistence_store + self.context = context + self.event = event + self.lambda_handler = lambda_handler + self.max_handler_retries = 2 + + def handle(self) -> Any: + """ + Main entry point for handling idempotent execution of lambda handler. + + Returns + ------- + Any + lambda handler response + + """ + try: + # We call save_inprogress first as an optimization for the most common case where no idempotent record + # already exists. If it succeeds, there's no need to call get_record. + self.persistence_store.save_inprogress(event=self.event) + except IdempotencyItemAlreadyExistsError: + # Now we know the item already exists, we can retrieve it + record = self._get_idempotency_record() + return self._handle_for_status(record) + + return self._call_lambda_handler() + + def _get_idempotency_record(self) -> DataRecord: + """ + Retrieve the idempotency record from the persistence layer. + + Raises + ---------- + IdempotencyInconsistentStateError + + """ + try: + event_record = self.persistence_store.get_record(self.event) + except IdempotencyItemNotFoundError: + # This code path will only be triggered if the record is removed between save_inprogress and get_record. + logger.debug( + "An existing idempotency record was deleted before we could retrieve it. Proceeding with lambda " + "handler" + ) + raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.") + + # Allow this exception to bubble up + except IdempotencyValidationError: + raise + + # Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for + # clients + except Exception as exc: + raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc + + return event_record + + def _handle_for_status(self, event_record: DataRecord) -> Optional[Dict[Any, Any]]: + """ + Take appropriate action based on event_record's status + + Parameters + ---------- + event_record: DataRecord + + Returns + ------- + Optional[Dict[Any, Any] + Lambda response previously used for this idempotency key, if it has successfully executed already. + + Raises + ------ + AlreadyInProgressError + A lambda execution is already in progress + IdempotencyInconsistentStateError + The persistence store reports inconsistent states across different requests. Retryable. + """ + # This code path will only be triggered if the record becomes expired between the save_inprogress call and here + if event_record.status == STATUS_CONSTANTS["EXPIRED"]: + raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.") + + if event_record.status == STATUS_CONSTANTS["INPROGRESS"]: + raise IdempotencyAlreadyInProgressError( + f"Execution already in progress with idempotency key: " + f"{self.persistence_store.event_key_jmespath}={event_record.idempotency_key}" + ) + + return event_record.response_json_as_dict() + + def _call_lambda_handler(self) -> Any: + """ + Call the lambda handler function and update the persistence store appropriate depending on the output + + Returns + ------- + Any + lambda handler response + + """ + try: + handler_response = self.lambda_handler(self.event, self.context) + except Exception as handler_exception: + # We need these nested blocks to preserve lambda handler exception in case the persistence store operation + # also raises an exception + try: + self.persistence_store.delete_record(event=self.event, exception=handler_exception) + except Exception as delete_exception: + raise IdempotencyPersistenceLayerError( + "Failed to delete record from idempotency store" + ) from delete_exception + raise + + else: + try: + self.persistence_store.save_success(event=self.event, result=handler_response) + except Exception as save_exception: + raise IdempotencyPersistenceLayerError( + "Failed to update record state to success in idempotency store" + ) from save_exception + + return handler_response diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/__init__.py b/aws_lambda_powertools/utilities/idempotency/persistence/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/base.py b/aws_lambda_powertools/utilities/idempotency/persistence/base.py new file mode 100644 index 00000000000..c9751b0ca12 --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/persistence/base.py @@ -0,0 +1,432 @@ +""" +Persistence layers supporting idempotency +""" + +import datetime +import hashlib +import json +import logging +from abc import ABC, abstractmethod +from typing import Any, Dict + +import jmespath + +from aws_lambda_powertools.shared.cache_dict import LRUDict +from aws_lambda_powertools.shared.json_encoder import Encoder +from aws_lambda_powertools.utilities.idempotency.exceptions import ( + IdempotencyInvalidStatusError, + IdempotencyItemAlreadyExistsError, + IdempotencyValidationError, +) + +logger = logging.getLogger(__name__) + +STATUS_CONSTANTS = {"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"} + + +class DataRecord: + """ + Data Class for idempotency records. + """ + + def __init__( + self, + idempotency_key, + status: str = "", + expiry_timestamp: int = None, + response_data: str = "", + payload_hash: str = None, + ) -> None: + """ + + Parameters + ---------- + idempotency_key: str + hashed representation of the idempotent data + status: str, optional + status of the idempotent record + expiry_timestamp: int, optional + time before the record should expire, in seconds + payload_hash: str, optional + hashed representation of payload + response_data: str, optional + response data from previous executions using the record + """ + self.idempotency_key = idempotency_key + self.payload_hash = payload_hash + self.expiry_timestamp = expiry_timestamp + self._status = status + self.response_data = response_data + + @property + def is_expired(self) -> bool: + """ + Check if data record is expired + + Returns + ------- + bool + Whether the record is currently expired or not + """ + return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp) + + @property + def status(self) -> str: + """ + Get status of data record + + Returns + ------- + str + """ + if self.is_expired: + return STATUS_CONSTANTS["EXPIRED"] + + if self._status in STATUS_CONSTANTS.values(): + return self._status + else: + raise IdempotencyInvalidStatusError(self._status) + + def response_json_as_dict(self) -> dict: + """ + Get response data deserialized to python dict + + Returns + ------- + dict + previous response data deserialized + """ + return json.loads(self.response_data) + + +class BasePersistenceLayer(ABC): + """ + Abstract Base Class for Idempotency persistence layer. + """ + + def __init__( + self, + event_key_jmespath: str = "", + payload_validation_jmespath: str = "", + expires_after_seconds: int = 60 * 60, # 1 hour default + use_local_cache: bool = False, + local_cache_max_items: int = 256, + hash_function: str = "md5", + ) -> None: + """ + Initialize the base persistence layer + + Parameters + ---------- + event_key_jmespath: str + A jmespath expression to extract the idempotency key from the event record + payload_validation_jmespath: str + A jmespath expression to extract the payload to be validated from the event record + expires_after_seconds: int + The number of seconds to wait before a record is expired + use_local_cache: bool, optional + Whether to locally cache idempotency results, by default False + local_cache_max_items: int, optional + Max number of items to store in local cache, by default 1024 + hash_function: str, optional + Function to use for calculating hashes, by default md5. + """ + self.event_key_jmespath = event_key_jmespath + if self.event_key_jmespath: + self.event_key_compiled_jmespath = jmespath.compile(event_key_jmespath) + self.expires_after_seconds = expires_after_seconds + self.use_local_cache = use_local_cache + if self.use_local_cache: + self._cache = LRUDict(max_items=local_cache_max_items) + self.payload_validation_enabled = False + if payload_validation_jmespath: + self.validation_key_jmespath = jmespath.compile(payload_validation_jmespath) + self.payload_validation_enabled = True + self.hash_function = getattr(hashlib, hash_function) + + def _get_hashed_idempotency_key(self, lambda_event: Dict[str, Any]) -> str: + """ + Extract data from lambda event using event key jmespath, and return a hashed representation + + Parameters + ---------- + lambda_event: Dict[str, Any] + Lambda event + + Returns + ------- + str + Hashed representation of the data extracted by the jmespath expression + + """ + data = lambda_event + if self.event_key_jmespath: + data = self.event_key_compiled_jmespath.search(lambda_event) + return self._generate_hash(data) + + def _get_hashed_payload(self, lambda_event: Dict[str, Any]) -> str: + """ + Extract data from lambda event using validation key jmespath, and return a hashed representation + + Parameters + ---------- + lambda_event: Dict[str, Any] + Lambda event + + Returns + ------- + str + Hashed representation of the data extracted by the jmespath expression + + """ + if not self.payload_validation_enabled: + return "" + data = self.validation_key_jmespath.search(lambda_event) + return self._generate_hash(data) + + def _generate_hash(self, data: Any) -> str: + """ + Generate a hash value from the provided data + + Parameters + ---------- + data: Any + The data to hash + + Returns + ------- + str + Hashed representation of the provided data + + """ + hashed_data = self.hash_function(json.dumps(data, cls=Encoder).encode()) + return hashed_data.hexdigest() + + def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecord) -> None: + """ + Validate that the hashed payload matches in the lambda event and stored data record + + Parameters + ---------- + lambda_event: Dict[str, Any] + Lambda event + data_record: DataRecord + DataRecord instance + + Raises + ______ + IdempotencyValidationError + Event payload doesn't match the stored record for the given idempotency key + + """ + if self.payload_validation_enabled: + lambda_payload_hash = self._get_hashed_payload(lambda_event) + if not data_record.payload_hash == lambda_payload_hash: + raise IdempotencyValidationError("Payload does not match stored record for this event key") + + def _get_expiry_timestamp(self) -> int: + """ + + Returns + ------- + int + unix timestamp of expiry date for idempotency record + + """ + now = datetime.datetime.now() + period = datetime.timedelta(seconds=self.expires_after_seconds) + return int((now + period).timestamp()) + + def _save_to_cache(self, data_record: DataRecord): + self._cache[data_record.idempotency_key] = data_record + + def _retrieve_from_cache(self, idempotency_key: str): + cached_record = self._cache.get(idempotency_key) + if cached_record: + if not cached_record.is_expired: + return cached_record + logger.debug(f"Removing expired local cache record for idempotency key: {idempotency_key}") + self._delete_from_cache(idempotency_key) + + def _delete_from_cache(self, idempotency_key: str): + del self._cache[idempotency_key] + + def save_success(self, event: Dict[str, Any], result: dict) -> None: + """ + Save record of function's execution completing succesfully + + Parameters + ---------- + event: Dict[str, Any] + Lambda event + result: dict + The response from lambda handler + """ + response_data = json.dumps(result, cls=Encoder) + + data_record = DataRecord( + idempotency_key=self._get_hashed_idempotency_key(event), + status=STATUS_CONSTANTS["COMPLETED"], + expiry_timestamp=self._get_expiry_timestamp(), + response_data=response_data, + payload_hash=self._get_hashed_payload(event), + ) + logger.debug( + f"Lambda successfully executed. Saving record to persistence store with " + f"idempotency key: {data_record.idempotency_key}" + ) + self._update_record(data_record=data_record) + + if self.use_local_cache: + self._save_to_cache(data_record) + + def save_inprogress(self, event: Dict[str, Any]) -> None: + """ + Save record of function's execution being in progress + + Parameters + ---------- + event: Dict[str, Any] + Lambda event + """ + data_record = DataRecord( + idempotency_key=self._get_hashed_idempotency_key(event), + status=STATUS_CONSTANTS["INPROGRESS"], + expiry_timestamp=self._get_expiry_timestamp(), + payload_hash=self._get_hashed_payload(event), + ) + + logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}") + + if self.use_local_cache: + cached_record = self._retrieve_from_cache(idempotency_key=data_record.idempotency_key) + if cached_record: + raise IdempotencyItemAlreadyExistsError + + self._put_record(data_record) + + # This has to come after _put_record. If _put_record call raises ItemAlreadyExists we shouldn't populate the + # cache with an "INPROGRESS" record as we don't know the status in the data store at this point. + if self.use_local_cache: + self._save_to_cache(data_record) + + def delete_record(self, event: Dict[str, Any], exception: Exception): + """ + Delete record from the persistence store + + Parameters + ---------- + event: Dict[str, Any] + Lambda event + exception + The exception raised by the lambda handler + """ + data_record = DataRecord(idempotency_key=self._get_hashed_idempotency_key(event)) + + logger.debug( + f"Lambda raised an exception ({type(exception).__name__}). Clearing in progress record in persistence " + f"store for idempotency key: {data_record.idempotency_key}" + ) + self._delete_record(data_record) + + if self.use_local_cache: + self._delete_from_cache(data_record.idempotency_key) + + def get_record(self, event: Dict[str, Any]) -> DataRecord: + """ + Calculate idempotency key for lambda_event, then retrieve item from persistence store using idempotency key + and return it as a DataRecord instance.and return it as a DataRecord instance. + + Parameters + ---------- + event: Dict[str, Any] + + Returns + ------- + DataRecord + DataRecord representation of existing record found in persistence store + + Raises + ------ + IdempotencyItemNotFoundError + Exception raised if no record exists in persistence store with the idempotency key + IdempotencyValidationError + Event payload doesn't match the stored record for the given idempotency key + """ + + idempotency_key = self._get_hashed_idempotency_key(event) + + if self.use_local_cache: + cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key) + if cached_record: + logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}") + self._validate_payload(event, cached_record) + return cached_record + + record = self._get_record(idempotency_key) + + if self.use_local_cache: + self._save_to_cache(data_record=record) + + self._validate_payload(event, record) + return record + + @abstractmethod + def _get_record(self, idempotency_key) -> DataRecord: + """ + Retrieve item from persistence store using idempotency key and return it as a DataRecord instance. + + Parameters + ---------- + idempotency_key + + Returns + ------- + DataRecord + DataRecord representation of existing record found in persistence store + + Raises + ------ + IdempotencyItemNotFoundError + Exception raised if no record exists in persistence store with the idempotency key + """ + raise NotImplementedError + + @abstractmethod + def _put_record(self, data_record: DataRecord) -> None: + """ + Add a DataRecord to persistence store if it does not already exist with that key. Raise ItemAlreadyExists + if a non-expired entry already exists. + + Parameters + ---------- + data_record: DataRecord + DataRecord instance + """ + + raise NotImplementedError + + @abstractmethod + def _update_record(self, data_record: DataRecord) -> None: + """ + Update item in persistence store + + Parameters + ---------- + data_record: DataRecord + DataRecord instance + """ + + raise NotImplementedError + + @abstractmethod + def _delete_record(self, data_record: DataRecord) -> None: + """ + Remove item from persistence store + Parameters + ---------- + data_record: DataRecord + DataRecord instance + """ + + raise NotImplementedError diff --git a/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py new file mode 100644 index 00000000000..4d66448755d --- /dev/null +++ b/aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py @@ -0,0 +1,163 @@ +import datetime +import logging +from typing import Any, Dict, Optional + +import boto3 +from botocore.config import Config + +from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer +from aws_lambda_powertools.utilities.idempotency.exceptions import ( + IdempotencyItemAlreadyExistsError, + IdempotencyItemNotFoundError, +) +from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord + +logger = logging.getLogger(__name__) + + +class DynamoDBPersistenceLayer(BasePersistenceLayer): + def __init__( + self, + table_name: str, + key_attr: str = "id", + expiry_attr: str = "expiration", + status_attr: str = "status", + data_attr: str = "data", + validation_key_attr: str = "validation", + boto_config: Optional[Config] = None, + boto3_session: Optional[boto3.session.Session] = None, + *args, + **kwargs, + ): + """ + Initialize the DynamoDB client + + Parameters + ---------- + table_name: str + Name of the table to use for storing execution records + key_attr: str, optional + DynamoDB attribute name for key, by default "id" + expiry_attr: str, optional + DynamoDB attribute name for expiry timestamp, by default "expiration" + status_attr: str, optional + DynamoDB attribute name for status, by default "status" + data_attr: str, optional + DynamoDB attribute name for response data, by default "data" + boto_config: botocore.config.Config, optional + Botocore configuration to pass during client initialization + boto3_session : boto3.session.Session, optional + Boto3 session to use for AWS API communication + + args + kwargs + + Examples + -------- + **Create a DynamoDB persistence layer with custom settings** + >>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer + >>> + >>> persistence_store = DynamoDBPersistenceLayer(event_key="body", table_name="idempotency_store") + >>> + >>> @idempotent(persistence_store=persistence_store) + >>> def handler(event, context): + >>> return {"StatusCode": 200} + """ + + boto_config = boto_config or Config() + session = boto3_session or boto3.session.Session() + self._ddb_resource = session.resource("dynamodb", config=boto_config) + self.table_name = table_name + self.table = self._ddb_resource.Table(self.table_name) + self.key_attr = key_attr + self.expiry_attr = expiry_attr + self.status_attr = status_attr + self.data_attr = data_attr + self.validation_key_attr = validation_key_attr + super(DynamoDBPersistenceLayer, self).__init__(*args, **kwargs) + + def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: + """ + Translate raw item records from DynamoDB to DataRecord + + Parameters + ---------- + item: Dict[str, Union[str, int]] + Item format from dynamodb response + + Returns + ------- + DataRecord + representation of item + + """ + return DataRecord( + idempotency_key=item[self.key_attr], + status=item[self.status_attr], + expiry_timestamp=item[self.expiry_attr], + response_data=item.get(self.data_attr), + payload_hash=item.get(self.validation_key_attr), + ) + + def _get_record(self, idempotency_key) -> DataRecord: + response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True) + + try: + item = response["Item"] + except KeyError: + raise IdempotencyItemNotFoundError + return self._item_to_data_record(item) + + def _put_record(self, data_record: DataRecord) -> None: + item = { + self.key_attr: data_record.idempotency_key, + self.expiry_attr: data_record.expiry_timestamp, + self.status_attr: data_record.status, + } + + if self.payload_validation_enabled: + item[self.validation_key_attr] = data_record.payload_hash + + now = datetime.datetime.now() + try: + logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") + self.table.put_item( + Item=item, + ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now", + ExpressionAttributeValues={":now": int(now.timestamp())}, + ) + except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException: + logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") + raise IdempotencyItemAlreadyExistsError + + def _update_record(self, data_record: DataRecord): + logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") + update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status" + expression_attr_values = { + ":expiry": data_record.expiry_timestamp, + ":response_data": data_record.response_data, + ":status": data_record.status, + } + expression_attr_names = { + "#response_data": self.data_attr, + "#expiry": self.expiry_attr, + "#status": self.status_attr, + } + + if self.payload_validation_enabled: + update_expression += ", #validation_key = :validation_key" + expression_attr_values[":validation_key"] = data_record.payload_hash + expression_attr_names["#validation_key"] = self.validation_key_attr + + kwargs = { + "Key": {self.key_attr: data_record.idempotency_key}, + "UpdateExpression": update_expression, + "ExpressionAttributeValues": expression_attr_values, + "ExpressionAttributeNames": expression_attr_names, + } + + self.table.update_item(**kwargs) + + def _delete_record(self, data_record: DataRecord) -> None: + logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}") + self.table.delete_item(Key={self.key_attr: data_record.idempotency_key},) diff --git a/docs/diagram_src/idempotent_sequence.puml b/docs/diagram_src/idempotent_sequence.puml new file mode 100644 index 00000000000..76c85942796 --- /dev/null +++ b/docs/diagram_src/idempotent_sequence.puml @@ -0,0 +1,29 @@ +@startuml +'https://plantuml.com/sequence-diagram + +participant Client +participant Lambda +participant "Persistence layer" + + +group initial request +Client->Lambda:Invoke (event) +Lambda->"Persistence layer":Get or set (id=event.search(payload)) +activate "Persistence layer" +note right of "Persistence layer":Locked during this time. Prevents \nmultiple Lambda invocations with the \nsame payload running concurrently. +Lambda-->Lambda:Run Lambda handler (event) +Lambda->"Persistence layer":Update record with Lambda handler result¹ +deactivate "Persistence layer" +"Persistence layer"-->"Persistence layer": Update record with result¹ +Client x<--Lambda:Response not received by client +end + +group retried request + +Client->Lambda: Invoke (event) +Lambda->"Persistence layer":Get or set (id=event.search(payload)) +Lambda<--"Persistence layer":Already exists in persistence layer. Return result¹ +Client<--Lambda:Response sent to client +end + +@enduml diff --git a/docs/diagram_src/idempotent_sequence_exception.puml b/docs/diagram_src/idempotent_sequence_exception.puml new file mode 100644 index 00000000000..7470cdd1c4e --- /dev/null +++ b/docs/diagram_src/idempotent_sequence_exception.puml @@ -0,0 +1,18 @@ +@startuml +'https://plantuml.com/sequence-diagram + +participant Client +participant Lambda +participant "Persistence layer" + + +Client->Lambda:Invoke (event) +Lambda->"Persistence layer":Get or set (id=event.search(payload)) +activate "Persistence layer" +note right of "Persistence layer":Locked during this time. Prevents \nmultiple Lambda invocations with the \nsame payload running concurrently. +Lambda-->x Lambda:Run Lambda handler (event). Raises Exception. +Lambda->"Persistence layer":Delete record (id=event.search(payload)) +deactivate "Persistence layer" +Client<--Lambda:Return error response + +@enduml diff --git a/docs/index.md b/docs/index.md index 2415a668ec1..cddf3182695 100644 --- a/docs/index.md +++ b/docs/index.md @@ -152,6 +152,7 @@ aws serverlessrepo list-application-versions \ | [Validation](./utilities/validation) | JSON Schema validator for inbound events and responses | [Event source data classes](./utilities/data_classes) | Data classes describing the schema of common Lambda event triggers | [Parser](./utilities/parser) | Data parsing and deep validation using Pydantic +| [Idempotency](./utilities/idempotency) | Idempotent Lambda handler ## Environment variables diff --git a/docs/media/idempotent_sequence.png b/docs/media/idempotent_sequence.png new file mode 100644 index 00000000000..92593184abb Binary files /dev/null and b/docs/media/idempotent_sequence.png differ diff --git a/docs/media/idempotent_sequence_exception.png b/docs/media/idempotent_sequence_exception.png new file mode 100644 index 00000000000..4cf065993dd Binary files /dev/null and b/docs/media/idempotent_sequence_exception.png differ diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md new file mode 100644 index 00000000000..1c6555088d9 --- /dev/null +++ b/docs/utilities/idempotency.md @@ -0,0 +1,362 @@ +--- +title: Idempotency +description: Utility +--- + +This utility provides a simple solution to convert your Lambda functions into idempotent operations which are safe to +retry. + +## Terminology + +The property of idempotency means that an operation does not cause additional side effects if it is called more than +once with the same input parameters. Idempotent operations will return the same result when they are called multiple +times with the same parameters. This makes idempotent operations safe to retry. + + +## Key features + +* Prevent Lambda handler code executing more than once on the same event payload during a time window +* Ensure Lambda handler returns the same result when called with the same payload +* Select a subset of the event as the idempotency key using JMESpath expressions +* Set a time window in which records with the same payload should be considered duplicates + +## Getting started + +### Required resources + +Before getting started, you need to create a persistent storage layer where the idempotency utility can store its +state. Your lambda functions will need read and write access to it. DynamoDB is currently the only supported persistent +storage layer, so you'll need to create a table first. + +> Example using AWS Serverless Application Model (SAM) + +=== "template.yml" +```yaml +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Runtime: python3.8 + ... + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref IdempotencyTable + IdempotencyTable: + Type: AWS::DynamoDB::Table + Properties: + AttributeDefinitions: + - AttributeName: id + AttributeType: S + BillingMode: PAY_PER_REQUEST + KeySchema: + - AttributeName: id + KeyType: HASH + TableName: "IdempotencyTable" + TimeToLiveSpecification: + AttributeName: expiration + Enabled: true +``` + +!!! warning + When using this utility with DynamoDB, your lambda responses must always be smaller than 400kb. Larger items cannot + be written to DynamoDB and will cause exceptions. + +!!! info + Each function invocation will generally make 2 requests to DynamoDB. If the + result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation. For retried invocations, you will + see 1WCU and 1RCU. Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/) to + estimate the cost. + +### Lambda handler + +You can quickly start by initializing the `DynamoDBPersistenceLayer` class outside the Lambda handler, and using it +with the `idempotent` decorator on your lambda handler. The only required parameter is `table_name`, but you likely +want to specify `event_key_jmespath` as well. + +`event_key_jmespath`: A JMESpath expression which will be used to extract the payload from the event your Lambda hander +is called with. This payload will be used as the key to decide if future invocations are duplicates. If you don't pass +this parameter, the entire event will be used as the key. + +=== "app.py" + + ```python hl_lines="2 6-9 11" + import json + from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + + # Treat everything under the "body" key in + # the event json object as our payload + persistence_layer = DynamoDBPersistenceLayer( + event_key_jmespath="body", + table_name="IdempotencyTable" + ) + + @idempotent(persistence_store=persistence_layer) + def handler(event, context): + body = json.loads(event['body']) + payment = create_subscription_payment( + user=body['user'], + product=body['product_id'] + ) + ... + return {"message": "success", "statusCode": 200, "payment_id": payment.id} + ``` +=== "Example event" + + ```json + { + "version":"2.0", + "routeKey":"ANY /createpayment", + "rawPath":"/createpayment", + "rawQueryString":"", + "headers": { + "Header1": "value1", + "Header2": "value2" + }, + "requestContext":{ + "accountId":"123456789012", + "apiId":"api-id", + "domainName":"id.execute-api.us-east-1.amazonaws.com", + "domainPrefix":"id", + "http":{ + "method":"POST", + "path":"/createpayment", + "protocol":"HTTP/1.1", + "sourceIp":"ip", + "userAgent":"agent" + }, + "requestId":"id", + "routeKey":"ANY /createpayment", + "stage":"$default", + "time":"10/Feb/2021:13:40:43 +0000", + "timeEpoch":1612964443723 + }, + "body":"{\"username\":\"xyz\",\"product_id\":\"123456789\"}", + "isBase64Encoded":false + } + ``` + +In this example, we have a Lambda handler that creates a payment for a user subscribing to a product. We want to ensure +that we don't accidentally charge our customer by subscribing them more than once. Imagine the function executes +successfully, but the client never receives the response. When we're using the idempotent decorator, we can safely +retry. This sequence diagram shows an example flow of what happens in this case: + +![Idempotent sequence](../media/idempotent_sequence.png) + + +The client was successful in receiving the result after the retry. Since the Lambda handler was only executed once, our +customer hasn't been charged twice. + +!!! note + Bear in mind that the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can + cause multiple side effects, consider splitting it into separate functions. + +### Handling exceptions + +If your Lambda handler raises an unhandled exception, the record in the persistence layer will be deleted. This means +that if the client retries, your Lambda handler will be free to execute again. If you don't want the record to be +deleted, you need to catch Exceptions within the handler and return a successful response. + + +![Idempotent sequence exception](../media/idempotent_sequence_exception.png) + +!!! warning + If any of the calls to the persistence layer unexpectedly fail, `IdempotencyPersistenceLayerError` will be raised. + As this happens outside the scope of your Lambda handler, you are not able to catch it. + +### Setting a time window +In most cases, it is not desirable to store the idempotency records forever. Rather, you want to guarantee that the +same payload won't be executed within a period of time. By default, the period is set to 1 hour (3600 seconds). You can +change this window with the `expires_after_seconds` parameter: + +```python hl_lines="4" +DynamoDBPersistenceLayer( + event_key_jmespath="body", + table_name="IdempotencyTable", + expires_after_seconds=5*60 # 5 minutes + ) + +``` +This will mark any records older than 5 minutes as expired, and the lambda handler will be executed as normal if it is +invoked with a matching payload. If you have set the TTL field in DynamoDB like in the SAM example above, the record +will be automatically deleted from the table after a period of itme. + + +### Handling concurrent executions +If you invoke a Lambda function with a given payload, then try to invoke it again with the same payload before the +first invocation has finished, we'll raise an `IdempotencyAlreadyInProgressError` exception. This is the utility's +locking mechanism at work. Since we don't know the result from the first invocation yet, we can't safely allow another +concurrent execution. If you receive this error, you can safely retry the operation. + + +### Using local cache +To reduce the number of lookups to the persistence storage layer, you can enable in memory caching with the +`use_local_cache` parameter, which is disabled by default. This cache is local to each Lambda execution environment. +This means it will be effective in cases where your function's concurrency is low in comparison to the number of +"retry" invocations with the same payload. When enabled, the default is to cache a maxmum of 256 records in each Lambda +execution environment. You can change this with the `local_cache_max_items` parameter. + +```python hl_lines="4 5" +DynamoDBPersistenceLayer( + event_key_jmespath="body", + table_name="IdempotencyTable", + use_local_cache=True, + local_cache_max_items=1000 + ) +``` + + +## Advanced + +### Payload validation +What happens if lambda is invoked with a payload that it has seen before, but some parameters which are not part of the +payload have changed? By default, lambda will return the same result as it returned before, which may be misleading. +Payload validation provides a solution to that. You can provide another JMESpath expression to the persistence store +with the `payload_validation_jmespath` to specify which part of the event body should be validated against previous +idempotent invocations. + +=== "app.py" + ```python hl_lines="6" + from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + + persistence_layer = DynamoDBPersistenceLayer( + event_key_jmespath="[userDetail, productId]", + table_name="IdempotencyTable",) + payload_validation_jmespath="amount" + ) + + @idempotent(persistence_store=persistence_layer) + def handler(event, context): + # Creating a subscription payment is a side + # effect of calling this function! + payment = create_subscription_payment( + user=event['userDetail']['username'], + product=event['product_id'], + amount=event['amount'] + ) + ... + return {"message": "success", "statusCode": 200, + "payment_id": payment.id, "amount": payment.amount} + ``` +=== "Event" + ```json + { + "userDetail": { + "username": "User1", + "user_email": "user@example.com" + }, + "productId": 1500, + "charge_type": "subscription", + "amount": 500 + } + ``` + +In this example, the "userDetail" and "productId" keys are used as the payload to generate the idempotency key. If +we try to send the same request but with a different amount, Lambda will raise `IdempotencyValidationError`. Without +payload validation, we would have returned the same result as we did for the initial request. Since we're also +returning an amount in the response, this could be quite confusing for the client. By using payload validation on the +amount field, we prevent this potentially confusing behaviour and instead raise an Exception. + +### Changing dynamoDB attribute names +If you want to use an existing DynamoDB table, or wish to change the name of the attributes used to store items in the +table, you can do so when you construct the `DynamoDBPersistenceLayer` instance. + + +Parameter | Default value | Description +------------------- |--------------- | ------------ +key_attr | "id" | Primary key of the table. Hashed representation of the payload +expiry_attr | "expiration" | Unix timestamp of when record expires +status_attr | "status" | Stores status of the lambda execution during and after invocation +data_attr | "data" | Stores results of successfully executed Lambda handlers +validation_key_attr | "validation" | Hashed representation of the parts of the event used for validation + +This example demonstrates changing the attribute names to custom values: + +=== "app.py" + ```python hl_lines="5-10" + persistence_layer = DynamoDBPersistenceLayer( + event_key_jmespath="[userDetail, productId]", + table_name="IdempotencyTable",) + key_attr="idempotency_key", + expiry_attr="expires_at", + status_attr="current_status", + data_attr="result_data", + validation_key_attr="validation_key" + ) + ``` + +### Customizing boto configuration +You can provide custom boto configuration or event bring your own boto3 session if required by using the `boto_config` +or `boto3_session` parameters when constructing the persistence store. + +=== "Custom session" + ```python hl_lines="1 4 8" + import boto3 + from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + + boto3_session = boto3.session.Session() + persistence_layer = DynamoDBPersistenceLayer( + event_key_jmespath="body", + table_name="IdempotencyTable", + boto3_session=boto3_session + ) + + @idempotent(persistence_store=persistence_layer) + def handler(event, context): + ... + ``` +=== "Custom config" + ```python hl_lines="1 4 8" + from botocore.config import Config + from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent + + boto_config = Config() + persistence_layer = DynamoDBPersistenceLayer( + event_key_jmespath="body", + table_name="IdempotencyTable", + boto_config=boto_config + ) + + @idempotent(persistence_store=persistence_layer) + def handler(event, context): + ... + ``` + +### Bring your own persistent store + +The utility provides an abstract base class which can be used to implement your choice of persistent storage layers. +You can inherit from the `BasePersistenceLayer` class and implement the abstract methods `_get_record`, `_put_record`, +`_update_record` and `_delete_record`. Pay attention to the documentation for each - you may need to perform additional +checks inside these methods to ensure the idempotency guarantees remain intact. For example, the `_put_record` method +needs to raise an exception if a non-expired record already exists in the data store with a matching key. + +## Compatibility with other utilities + +### Validation utility + +The idempotency utility can be used with the `validator` decorator. Ensure that idempotency is the innermost decorator. + +!!! warning + If you use an envelope with the validator, the event received by the idempotency utility will be the unwrapped + event - not the "raw" event Lambda was invoked with. You will need to account for this if you set the + `event_key_jmespath`. + +=== "app.py" + ```python hl_lines="9 10" + from aws_lambda_powertools.utilities.validation import validator, envelopes + from aws_lambda_powertools.utilities.idempotency.idempotency import idempotent + + persistence_layer = DynamoDBPersistenceLayer( + event_key_jmespath="[message, username]", + table_name="IdempotencyTable", + ) + + @validator(envelope=envelopes.API_GATEWAY_HTTP) + @idempotent(persistence_store=persistence_layer) + def lambda_handler(event, context): + cause_some_side_effects(event['username') + return {"message": event['message'], "statusCode": 200} + ``` + +## Extra resources +If you're interested in a deep dive on how Amazon uses idempotency when building our APIs, check out +[this article](https://aws.amazon.com/builders-library/making-retries-safe-with-idempotent-APIs/). diff --git a/mkdocs.yml b/mkdocs.yml index b3030c6b429..c8d1beda6a7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -15,6 +15,7 @@ nav: - utilities/validation.md - utilities/data_classes.md - utilities/parser.md + - utilities/idempotency.md theme: name: material diff --git a/tests/events/apiGatewayProxyV2Event.json b/tests/events/apiGatewayProxyV2Event.json index 9c310e6d52f..4d0cfdf5703 100644 --- a/tests/events/apiGatewayProxyV2Event.json +++ b/tests/events/apiGatewayProxyV2Event.json @@ -45,7 +45,7 @@ "time": "12/Mar/2020:19:03:58 +0000", "timeEpoch": 1583348638390 }, - "body": "Hello from Lambda", + "body": "{\"message\": \"hello world\", \"username\": \"tom\"}", "pathParameters": { "parameter1": "value1" }, diff --git a/tests/functional/idempotency/__init__.py b/tests/functional/idempotency/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/functional/idempotency/conftest.py b/tests/functional/idempotency/conftest.py new file mode 100644 index 00000000000..918eac9a507 --- /dev/null +++ b/tests/functional/idempotency/conftest.py @@ -0,0 +1,185 @@ +import datetime +import hashlib +import json +import os +from decimal import Decimal +from unittest import mock + +import jmespath +import pytest +from botocore import stub +from botocore.config import Config + +from aws_lambda_powertools.shared.json_encoder import Encoder +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer +from aws_lambda_powertools.utilities.validation import envelopes +from aws_lambda_powertools.utilities.validation.base import unwrap_event_from_envelope + +TABLE_NAME = "TEST_TABLE" + + +@pytest.fixture(scope="module") +def config() -> Config: + return Config(region_name="us-east-1") + + +@pytest.fixture(scope="module") +def lambda_apigw_event(): + full_file_name = os.path.dirname(os.path.realpath(__file__)) + "/../../events/" + "apiGatewayProxyV2Event.json" + with open(full_file_name) as fp: + event = json.load(fp) + + return event + + +@pytest.fixture +def timestamp_future(): + return str(int((datetime.datetime.now() + datetime.timedelta(seconds=3600)).timestamp())) + + +@pytest.fixture +def timestamp_expired(): + now = datetime.datetime.now() + period = datetime.timedelta(seconds=6400) + return str(int((now - period).timestamp())) + + +@pytest.fixture(scope="module") +def lambda_response(): + return {"message": "test", "statusCode": 200, "decimal_val": Decimal("2.5"), "decimal_NaN": Decimal("NaN")} + + +@pytest.fixture(scope="module") +def serialized_lambda_response(lambda_response): + return json.dumps(lambda_response, cls=Encoder) + + +@pytest.fixture(scope="module") +def deserialized_lambda_response(lambda_response): + return json.loads(json.dumps(lambda_response, cls=Encoder)) + + +@pytest.fixture +def default_jmespath(): + return "[body, queryStringParameters]" + + +@pytest.fixture +def expected_params_update_item(serialized_lambda_response, hashed_idempotency_key): + return { + "ExpressionAttributeNames": {"#expiry": "expiration", "#response_data": "data", "#status": "status"}, + "ExpressionAttributeValues": { + ":expiry": stub.ANY, + ":response_data": serialized_lambda_response, + ":status": "COMPLETED", + }, + "Key": {"id": hashed_idempotency_key}, + "TableName": "TEST_TABLE", + "UpdateExpression": "SET #response_data = :response_data, " "#expiry = :expiry, #status = :status", + } + + +@pytest.fixture +def expected_params_update_item_with_validation( + serialized_lambda_response, hashed_idempotency_key, hashed_validation_key +): + return { + "ExpressionAttributeNames": { + "#expiry": "expiration", + "#response_data": "data", + "#status": "status", + "#validation_key": "validation", + }, + "ExpressionAttributeValues": { + ":expiry": stub.ANY, + ":response_data": serialized_lambda_response, + ":status": "COMPLETED", + ":validation_key": hashed_validation_key, + }, + "Key": {"id": hashed_idempotency_key}, + "TableName": "TEST_TABLE", + "UpdateExpression": "SET #response_data = :response_data, " + "#expiry = :expiry, #status = :status, " + "#validation_key = :validation_key", + } + + +@pytest.fixture +def expected_params_put_item(hashed_idempotency_key): + return { + "ConditionExpression": "attribute_not_exists(id) OR expiration < :now", + "ExpressionAttributeValues": {":now": stub.ANY}, + "Item": {"expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS"}, + "TableName": "TEST_TABLE", + } + + +@pytest.fixture +def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_validation_key): + return { + "ConditionExpression": "attribute_not_exists(id) OR expiration < :now", + "ExpressionAttributeValues": {":now": stub.ANY}, + "Item": { + "expiration": stub.ANY, + "id": hashed_idempotency_key, + "status": "INPROGRESS", + "validation": hashed_validation_key, + }, + "TableName": "TEST_TABLE", + } + + +@pytest.fixture +def hashed_idempotency_key(lambda_apigw_event, default_jmespath): + compiled_jmespath = jmespath.compile(default_jmespath) + data = compiled_jmespath.search(lambda_apigw_event) + return hashlib.md5(json.dumps(data).encode()).hexdigest() + + +@pytest.fixture +def hashed_idempotency_key_with_envelope(lambda_apigw_event): + event = unwrap_event_from_envelope( + data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={} + ) + return hashlib.md5(json.dumps(event).encode()).hexdigest() + + +@pytest.fixture +def hashed_validation_key(lambda_apigw_event): + return hashlib.md5(json.dumps(lambda_apigw_event["requestContext"]).encode()).hexdigest() + + +@pytest.fixture +def persistence_store(config, request, default_jmespath): + persistence_store = DynamoDBPersistenceLayer( + event_key_jmespath=default_jmespath, + table_name=TABLE_NAME, + boto_config=config, + use_local_cache=request.param["use_local_cache"], + ) + return persistence_store + + +@pytest.fixture +def persistence_store_without_jmespath(config, request): + persistence_store = DynamoDBPersistenceLayer( + table_name=TABLE_NAME, boto_config=config, use_local_cache=request.param["use_local_cache"], + ) + return persistence_store + + +@pytest.fixture +def persistence_store_with_validation(config, request, default_jmespath): + persistence_store = DynamoDBPersistenceLayer( + event_key_jmespath=default_jmespath, + table_name=TABLE_NAME, + boto_config=config, + use_local_cache=request.param, + payload_validation_jmespath="requestContext", + ) + return persistence_store + + +@pytest.fixture +def mock_function(): + return mock.MagicMock() diff --git a/tests/functional/idempotency/test_idempotency.py b/tests/functional/idempotency/test_idempotency.py new file mode 100644 index 00000000000..e6e64e3b38b --- /dev/null +++ b/tests/functional/idempotency/test_idempotency.py @@ -0,0 +1,596 @@ +import copy +import sys + +import pytest +from botocore import stub + +from aws_lambda_powertools.utilities.idempotency.exceptions import ( + IdempotencyAlreadyInProgressError, + IdempotencyInconsistentStateError, + IdempotencyInvalidStatusError, + IdempotencyPersistenceLayerError, + IdempotencyValidationError, +) +from aws_lambda_powertools.utilities.idempotency.idempotency import idempotent +from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord +from aws_lambda_powertools.utilities.validation import envelopes, validator + +TABLE_NAME = "TEST_TABLE" + + +# Using parametrize to run test twice, with two separate instances of persistence store. One instance with caching +# enabled, and one without. +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_already_completed( + persistence_store, + lambda_apigw_event, + timestamp_future, + hashed_idempotency_key, + serialized_lambda_response, + deserialized_lambda_response, +): + """ + Test idempotent decorator where event with matching event key has already been succesfully processed + """ + + stubber = stub.Stubber(persistence_store.table.meta.client) + ddb_response = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "data": {"S": serialized_lambda_response}, + "status": {"S": "COMPLETED"}, + } + } + + expected_params = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key}, + "ConsistentRead": True, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response, expected_params) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + raise Exception + + lambda_resp = lambda_handler(lambda_apigw_event, {}) + assert lambda_resp == deserialized_lambda_response + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_in_progress( + persistence_store, lambda_apigw_event, lambda_response, timestamp_future, hashed_idempotency_key +): + """ + Test idempotent decorator where lambda_handler is already processing an event with matching event key + """ + + stubber = stub.Stubber(persistence_store.table.meta.client) + + expected_params = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key}, + "ConsistentRead": True, + } + ddb_response = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "status": {"S": "INPROGRESS"}, + } + } + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response, expected_params) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + with pytest.raises(IdempotencyAlreadyInProgressError) as ex: + lambda_handler(lambda_apigw_event, {}) + assert ( + ex.value.args[0] == "Execution already in progress with idempotency key: " + "body=a3edd699125517bb49d562501179ecbd" + ) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="issue with pytest mock lib for < 3.8") +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_in_progress_with_cache( + persistence_store, lambda_apigw_event, lambda_response, timestamp_future, hashed_idempotency_key, mocker +): + """ + Test idempotent decorator where lambda_handler is already processing an event with matching event key, cache + enabled. + """ + save_to_cache_spy = mocker.spy(persistence_store, "_save_to_cache") + retrieve_from_cache_spy = mocker.spy(persistence_store, "_retrieve_from_cache") + stubber = stub.Stubber(persistence_store.table.meta.client) + + expected_params = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key}, + "ConsistentRead": True, + } + ddb_response = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "status": {"S": "INPROGRESS"}, + } + } + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response, expected_params) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + loops = 3 + for _ in range(loops): + with pytest.raises(IdempotencyAlreadyInProgressError) as ex: + lambda_handler(lambda_apigw_event, {}) + assert ( + ex.value.args[0] == "Execution already in progress with idempotency key: " + "body=a3edd699125517bb49d562501179ecbd" + ) + + assert retrieve_from_cache_spy.call_count == 2 * loops + retrieve_from_cache_spy.assert_called_with(idempotency_key=hashed_idempotency_key) + + assert save_to_cache_spy.call_count == 1 + first_call_args_data_record = save_to_cache_spy.call_args_list[0].kwargs["data_record"] + assert first_call_args_data_record.idempotency_key == hashed_idempotency_key + assert first_call_args_data_record.status == "INPROGRESS" + assert persistence_store._cache.get(hashed_idempotency_key) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_first_execution( + persistence_store, + lambda_apigw_event, + expected_params_update_item, + expected_params_put_item, + lambda_response, + serialized_lambda_response, + deserialized_lambda_response, + hashed_idempotency_key, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key + """ + + stubber = stub.Stubber(persistence_store.table.meta.client) + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("update_item", ddb_response, expected_params_update_item) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + lambda_handler(lambda_apigw_event, {}) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.skipif(sys.version_info < (3, 8), reason="issue with pytest mock lib for < 3.8") +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_first_execution_cached( + persistence_store, + lambda_apigw_event, + expected_params_update_item, + expected_params_put_item, + lambda_response, + hashed_idempotency_key, + mocker, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key. Ensure + result is cached locally on the persistence store instance. + """ + save_to_cache_spy = mocker.spy(persistence_store, "_save_to_cache") + retrieve_from_cache_spy = mocker.spy(persistence_store, "_retrieve_from_cache") + stubber = stub.Stubber(persistence_store.table.meta.client) + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("update_item", ddb_response, expected_params_update_item) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + lambda_handler(lambda_apigw_event, {}) + + assert retrieve_from_cache_spy.call_count == 1 + assert save_to_cache_spy.call_count == 2 + first_call_args, second_call_args = save_to_cache_spy.call_args_list + assert first_call_args.args[0].status == "INPROGRESS" + assert second_call_args.args[0].status == "COMPLETED" + assert persistence_store._cache.get(hashed_idempotency_key) + + # This lambda call should not call AWS API + lambda_handler(lambda_apigw_event, {}) + assert retrieve_from_cache_spy.call_count == 3 + retrieve_from_cache_spy.assert_called_with(idempotency_key=hashed_idempotency_key) + + # This assertion fails if an AWS API operation was called more than once + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_expired( + persistence_store, + lambda_apigw_event, + timestamp_expired, + lambda_response, + expected_params_update_item, + expected_params_put_item, + hashed_idempotency_key, +): + """ + Test idempotent decorator when lambda is called with an event it succesfully handled already, but outside of the + expiry window + """ + + stubber = stub.Stubber(persistence_store.table.meta.client) + + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("update_item", ddb_response, expected_params_update_item) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + lambda_handler(lambda_apigw_event, {}) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_exception( + persistence_store, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + expected_params_put_item, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key, but + lambda_handler raises an exception which is retryable. + """ + + # Create a new provider + + # Stub the boto3 client + stubber = stub.Stubber(persistence_store.table.meta.client) + + ddb_response = {} + expected_params_delete_item = {"TableName": TABLE_NAME, "Key": {"id": hashed_idempotency_key}} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_response("delete_item", ddb_response, expected_params_delete_item) + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + raise Exception("Something went wrong!") + + with pytest.raises(Exception): + lambda_handler(lambda_apigw_event, {}) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize( + "persistence_store_with_validation", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True +) +def test_idempotent_lambda_already_completed_with_validation_bad_payload( + persistence_store_with_validation, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + hashed_validation_key, +): + """ + Test idempotent decorator where event with matching event key has already been succesfully processed + """ + + stubber = stub.Stubber(persistence_store_with_validation.table.meta.client) + ddb_response = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_future}, + "data": {"S": '{"message": "test", "statusCode": 200}'}, + "status": {"S": "COMPLETED"}, + "validation": {"S": hashed_validation_key}, + } + } + + expected_params = {"TableName": TABLE_NAME, "Key": {"id": hashed_idempotency_key}, "ConsistentRead": True} + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response, expected_params) + stubber.activate() + + @idempotent(persistence_store=persistence_store_with_validation) + def lambda_handler(event, context): + return lambda_response + + with pytest.raises(IdempotencyValidationError): + lambda_apigw_event["requestContext"]["accountId"] += "1" # Alter the request payload + lambda_handler(lambda_apigw_event, {}) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_lambda_expired_during_request( + persistence_store, + lambda_apigw_event, + timestamp_expired, + lambda_response, + expected_params_update_item, + hashed_idempotency_key, +): + """ + Test idempotent decorator when lambda is called with an event it succesfully handled already. Persistence store + returns inconsistent/rapidly changing result between put_item and get_item calls. + """ + + stubber = stub.Stubber(persistence_store.table.meta.client) + + ddb_response_get_item = { + "Item": { + "id": {"S": hashed_idempotency_key}, + "expiration": {"N": timestamp_expired}, + "data": {"S": '{"message": "test", "statusCode": 200}'}, + "status": {"S": "INPROGRESS"}, + } + } + ddb_response_get_item_missing = {} + expected_params_get_item = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key}, + "ConsistentRead": True, + } + + # Simulate record repeatedly changing state between put_item and get_item + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response_get_item, expected_params_get_item) + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response_get_item_missing) + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", copy.deepcopy(ddb_response_get_item), copy.deepcopy(expected_params_get_item)) + + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return lambda_response + + # max retries exceeded before get_item and put_item agree on item state, so exception gets raised + with pytest.raises(IdempotencyInconsistentStateError): + lambda_handler(lambda_apigw_event, {}) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_persistence_exception_deleting( + persistence_store, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + expected_params_put_item, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key, but + lambda_handler raises an exception which is retryable. + """ + stubber = stub.Stubber(persistence_store.table.meta.client) + + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_client_error("delete_item", "UnrecoverableError") + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + raise Exception("Something went wrong!") + + with pytest.raises(IdempotencyPersistenceLayerError) as exc: + lambda_handler(lambda_apigw_event, {}) + + assert exc.value.args[0] == "Failed to delete record from idempotency store" + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_persistence_exception_updating( + persistence_store, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + expected_params_put_item, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key, but + lambda_handler raises an exception which is retryable. + """ + stubber = stub.Stubber(persistence_store.table.meta.client) + + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item) + stubber.add_client_error("update_item", "UnrecoverableError") + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return {"message": "success!"} + + with pytest.raises(IdempotencyPersistenceLayerError) as exc: + lambda_handler(lambda_apigw_event, {}) + + assert exc.value.args[0] == "Failed to update record state to success in idempotency store" + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True) +def test_idempotent_persistence_exception_getting( + persistence_store, + lambda_apigw_event, + timestamp_future, + lambda_response, + hashed_idempotency_key, + expected_params_put_item, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key, but + lambda_handler raises an exception which is retryable. + """ + stubber = stub.Stubber(persistence_store.table.meta.client) + + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_client_error("get_item", "UnexpectedException") + stubber.activate() + + @idempotent(persistence_store=persistence_store) + def lambda_handler(event, context): + return {"message": "success!"} + + with pytest.raises(IdempotencyPersistenceLayerError) as exc: + lambda_handler(lambda_apigw_event, {}) + + assert exc.value.args[0] == "Failed to get record from idempotency store" + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize( + "persistence_store_with_validation", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True +) +def test_idempotent_lambda_first_execution_with_validation( + persistence_store_with_validation, + lambda_apigw_event, + expected_params_update_item_with_validation, + expected_params_put_item_with_validation, + lambda_response, + hashed_idempotency_key, + hashed_validation_key, +): + """ + Test idempotent decorator when lambda is executed with an event with a previously unknown event key + """ + stubber = stub.Stubber(persistence_store_with_validation.table.meta.client) + ddb_response = {} + + stubber.add_response("put_item", ddb_response, expected_params_put_item_with_validation) + stubber.add_response("update_item", ddb_response, expected_params_update_item_with_validation) + stubber.activate() + + @idempotent(persistence_store=persistence_store_with_validation) + def lambda_handler(lambda_apigw_event, context): + return lambda_response + + lambda_handler(lambda_apigw_event, {}) + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +@pytest.mark.parametrize( + "persistence_store_without_jmespath", [{"use_local_cache": False}, {"use_local_cache": True}], indirect=True +) +def test_idempotent_lambda_with_validator_util( + persistence_store_without_jmespath, + lambda_apigw_event, + timestamp_future, + serialized_lambda_response, + deserialized_lambda_response, + hashed_idempotency_key_with_envelope, + mock_function, +): + """ + Test idempotent decorator where event with matching event key has already been succesfully processed, using the + validator utility to unwrap the event + """ + + stubber = stub.Stubber(persistence_store_without_jmespath.table.meta.client) + ddb_response = { + "Item": { + "id": {"S": hashed_idempotency_key_with_envelope}, + "expiration": {"N": timestamp_future}, + "data": {"S": serialized_lambda_response}, + "status": {"S": "COMPLETED"}, + } + } + + expected_params = { + "TableName": TABLE_NAME, + "Key": {"id": hashed_idempotency_key_with_envelope}, + "ConsistentRead": True, + } + stubber.add_client_error("put_item", "ConditionalCheckFailedException") + stubber.add_response("get_item", ddb_response, expected_params) + stubber.activate() + + @validator(envelope=envelopes.API_GATEWAY_HTTP) + @idempotent(persistence_store=persistence_store_without_jmespath) + def lambda_handler(event, context): + mock_function() + return "shouldn't get here!" + + mock_function.assert_not_called() + lambda_resp = lambda_handler(lambda_apigw_event, {}) + assert lambda_resp == deserialized_lambda_response + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +def test_data_record_invalid_status_value(): + data_record = DataRecord("key", status="UNSUPPORTED_STATUS") + with pytest.raises(IdempotencyInvalidStatusError) as e: + _ = data_record.status + + assert e.value.args[0] == "UNSUPPORTED_STATUS" diff --git a/tests/unit/test_json_encoder.py b/tests/unit/test_json_encoder.py new file mode 100644 index 00000000000..8d6a9f3944c --- /dev/null +++ b/tests/unit/test_json_encoder.py @@ -0,0 +1,14 @@ +import decimal +import json + +from aws_lambda_powertools.shared.json_encoder import Encoder + + +def test_jsonencode_decimal(): + result = json.dumps({"val": decimal.Decimal("8.5")}, cls=Encoder) + assert result == '{"val": "8.5"}' + + +def test_jsonencode_decimal_nan(): + result = json.dumps({"val": decimal.Decimal("NaN")}, cls=Encoder) + assert result == '{"val": NaN}' diff --git a/tests/unit/test_lru_cache.py b/tests/unit/test_lru_cache.py new file mode 100644 index 00000000000..170972432ce --- /dev/null +++ b/tests/unit/test_lru_cache.py @@ -0,0 +1,58 @@ +import random + +import pytest + +from aws_lambda_powertools.shared.cache_dict import LRUDict + +MAX_CACHE_ITEMS = 50 +PREFILL_CACHE_ITEMS = 50 + + +@pytest.fixture +def populated_cache(): + cache_dict = LRUDict(max_items=MAX_CACHE_ITEMS, **{f"key_{i}": f"val_{i}" for i in range(0, PREFILL_CACHE_ITEMS)}) + return cache_dict + + +def test_cache_order_init(populated_cache): + first_item = list(populated_cache)[0] + last_item = list(populated_cache)[-1] + + assert first_item == "key_0" + assert last_item == f"key_{MAX_CACHE_ITEMS - 1}" + + +def test_cache_order_getitem(populated_cache): + random_value = random.randrange(0, MAX_CACHE_ITEMS) + _ = populated_cache[f"key_{random_value}"] + + last_item = list(populated_cache)[-1] + + assert last_item == f"key_{random_value}" + + +def test_cache_order_get(populated_cache): + random_value = random.randrange(0, MAX_CACHE_ITEMS) + _ = populated_cache.get(f"key_{random_value}") + + last_item = list(populated_cache)[-1] + + assert last_item == f"key_{random_value}" + + +def test_cache_evict_over_max_items(populated_cache): + assert "key_0" in populated_cache + assert len(populated_cache) == MAX_CACHE_ITEMS + populated_cache["new_item"] = "new_value" + assert len(populated_cache) == MAX_CACHE_ITEMS + assert "key_0" not in populated_cache + assert "key_1" in populated_cache + + +def test_setitem_moves_to_end(populated_cache): + random_value = random.randrange(0, MAX_CACHE_ITEMS) + populated_cache[f"key_{random_value}"] = f"new_val_{random_value}" + last_item = list(populated_cache)[-1] + + assert last_item == f"key_{random_value}" + assert populated_cache[f"key_{random_value}"] == f"new_val_{random_value}"