Skip to content

Commit

Permalink
feat: Idempotency helper utility (#245)
Browse files Browse the repository at this point in the history
* feat: initial commit for idempotency utility

* fix: ensure region is configured in botocore for tests

* chore: ignore security warning for md5 usage

* chore: add debug logging

* feat: add local caching for idempotency lookups

* feat: replace simple dict cache with LRU

* feat: remove idempotent exception handling

* feat: remove unused logic to create ddb table - will handle in documentation instead

* fix: remove redundant code from table creation logic

* chore: move tests to own dir

* chore: remove redundant code for exception handling

* feat: add payload validation logic and functionality to use different hash functions from hashlib

* feat: optimization to reduce number of database calls, reorganize persistence layer modules

* chore: type corrections

* chore: add more logging statements

* fix: Use variable for ddb attribute name

* chore: clarify docstring for abstract method

* feat: Refactor to cover corner cases where state changes between calls to db

* chore: correct stubbed ddb responses for test case

* docs: add first of a few seq diagrams to support documentation

* feat: use boto3 session for constructing clients to allow customization of credentials

* chore: move cache dict implementation to shared dir

* chore: refactor with improvements for readability, variable names, and exception handling

* chore: remove dead code, rename variable for clarity, change args to kwargs in function call

* chore: improve test coverage, refactor fixtures

* chore: skip tests using pytest-mock's spy for python < 3.8 due to issues with lib

* chore: update test fixtures to use jmespath

* docs: first draft of docs for idempotency util

* fix: Allow event_key_jmespath to be left empty to use entire event as payload

* docs: add section for compatibility with other utils

* chore: improvements to func tests

* chore: add unit tests for lru cache

* feat: add support for decimals in json serializer

* chore: Add docstring for LRU cache dict

* chore: Remove unused status constants

* chore: Rename method for clarity

* chore: Correct example in docstring

* fix: make data attribute of data record optional in get_record so we don't throw the wrong error for INPROGRESS

* docs: clarify behaviour for concurrent executions and DDB behaviour for large items

* Update aws_lambda_powertools/shared/cache_dict.py

Co-authored-by: Michael Brewer <[email protected]>

* Update aws_lambda_powertools/shared/cache_dict.py

Co-authored-by: Michael Brewer <[email protected]>

* Update aws_lambda_powertools/utilities/idempotency/persistence/base.py

Co-authored-by: Michael Brewer <[email protected]>

* chore: add test for invalid status on data record

* Update aws_lambda_powertools/utilities/idempotency/persistence/base.py

Co-authored-by: Michael Brewer <[email protected]>

Co-authored-by: Michael Brewer <[email protected]>
  • Loading branch information
Tom McCarthy and Michael Brewer authored Feb 19, 2021
1 parent b5ada70 commit 03f7dcd
Show file tree
Hide file tree
Showing 21 changed files with 2,183 additions and 1 deletion.
31 changes: 31 additions & 0 deletions aws_lambda_powertools/shared/cache_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from collections import OrderedDict


class LRUDict(OrderedDict):
"""
Cache implementation based on ordered dict with a maximum number of items. Last accessed item will be evicted
first. Currently used by idempotency utility.
"""

def __init__(self, max_items=1024, *args, **kwargs):
self.max_items = max_items
super().__init__(*args, **kwargs)

def __getitem__(self, key):
value = super().__getitem__(key)
self.move_to_end(key)
return value

def __setitem__(self, key, value):
if key in self:
self.move_to_end(key)
super().__setitem__(key, value)
if len(self) > self.max_items:
oldest = next(iter(self))
del self[oldest]

def get(self, key, *args, **kwargs):
item = super(LRUDict, self).get(key, *args, **kwargs)
if item:
self.move_to_end(key=key)
return item
16 changes: 16 additions & 0 deletions aws_lambda_powertools/shared/json_encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import decimal
import json
import math


class Encoder(json.JSONEncoder):
"""
Custom JSON encoder to allow for serialization of Decimals, similar to the serializer used by Lambda internally.
"""

def default(self, obj):
if isinstance(obj, decimal.Decimal):
if obj.is_nan():
return math.nan
return str(obj)
return super().default(obj)
10 changes: 10 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
Utility for adding idempotency to lambda functions
"""

from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer

from .idempotency import idempotent

__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent")
45 changes: 45 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""
Idempotency errors
"""


class IdempotencyItemAlreadyExistsError(Exception):
"""
Item attempting to be inserted into persistence store already exists and is not expired
"""


class IdempotencyItemNotFoundError(Exception):
"""
Item does not exist in persistence store
"""


class IdempotencyAlreadyInProgressError(Exception):
"""
Execution with idempotency key is already in progress
"""


class IdempotencyInvalidStatusError(Exception):
"""
An invalid status was provided
"""


class IdempotencyValidationError(Exception):
"""
Payload does not match stored idempotency record
"""


class IdempotencyInconsistentStateError(Exception):
"""
State is inconsistent across multiple requests to persistence store
"""


class IdempotencyPersistenceLayerError(Exception):
"""
Unrecoverable error from the data store
"""
221 changes: 221 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""
Primary interface for idempotent Lambda functions utility
"""
import logging
from typing import Any, Callable, Dict, Optional

from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
IdempotencyPersistenceLayerError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
STATUS_CONSTANTS,
BasePersistenceLayer,
DataRecord,
)
from aws_lambda_powertools.utilities.typing import LambdaContext

logger = logging.getLogger(__name__)


@lambda_handler_decorator
def idempotent(
handler: Callable[[Any, LambdaContext], Any],
event: Dict[str, Any],
context: LambdaContext,
persistence_store: BasePersistenceLayer,
) -> Any:
"""
Middleware to handle idempotency
Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
Examples
--------
**Processes Lambda's event in an idempotent manner**
>>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer
>>>
>>> persistence_store = DynamoDBPersistenceLayer(event_key_jmespath="body", table_name="idempotency_store")
>>>
>>> @idempotent(persistence_store=persistence_store)
>>> def handler(event, context):
>>> return {"StatusCode": 200}
"""

idempotency_handler = IdempotencyHandler(handler, event, context, persistence_store)

# IdempotencyInconsistentStateError can happen under rare but expected cases when persistent state changes in the
# small time between put & get requests. In most cases we can retry successfully on this exception.
max_handler_retries = 2
for i in range(max_handler_retries + 1):
try:
return idempotency_handler.handle()
except IdempotencyInconsistentStateError:
if i < max_handler_retries:
continue
else:
# Allow the exception to bubble up after max retries exceeded
raise


class IdempotencyHandler:
"""
Class to orchestrate calls to persistence layer.
"""

def __init__(
self,
lambda_handler: Callable[[Any, LambdaContext], Any],
event: Dict[str, Any],
context: LambdaContext,
persistence_store: BasePersistenceLayer,
):
"""
Initialize the IdempotencyHandler
Parameters
----------
lambda_handler : Callable[[Any, LambdaContext], Any]
Lambda function handler
event : Dict[str, Any]
Event payload lambda handler will be called with
context : LambdaContext
Context object which will be passed to lambda handler
persistence_store : BasePersistenceLayer
Instance of persistence layer to store idempotency records
"""
self.persistence_store = persistence_store
self.context = context
self.event = event
self.lambda_handler = lambda_handler
self.max_handler_retries = 2

def handle(self) -> Any:
"""
Main entry point for handling idempotent execution of lambda handler.
Returns
-------
Any
lambda handler response
"""
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(event=self.event)
except IdempotencyItemAlreadyExistsError:
# Now we know the item already exists, we can retrieve it
record = self._get_idempotency_record()
return self._handle_for_status(record)

return self._call_lambda_handler()

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
Raises
----------
IdempotencyInconsistentStateError
"""
try:
event_record = self.persistence_store.get_record(self.event)
except IdempotencyItemNotFoundError:
# This code path will only be triggered if the record is removed between save_inprogress and get_record.
logger.debug(
"An existing idempotency record was deleted before we could retrieve it. Proceeding with lambda "
"handler"
)
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

# Allow this exception to bubble up
except IdempotencyValidationError:
raise

# Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for
# clients
except Exception as exc:
raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc

return event_record

def _handle_for_status(self, event_record: DataRecord) -> Optional[Dict[Any, Any]]:
"""
Take appropriate action based on event_record's status
Parameters
----------
event_record: DataRecord
Returns
-------
Optional[Dict[Any, Any]
Lambda response previously used for this idempotency key, if it has successfully executed already.
Raises
------
AlreadyInProgressError
A lambda execution is already in progress
IdempotencyInconsistentStateError
The persistence store reports inconsistent states across different requests. Retryable.
"""
# This code path will only be triggered if the record becomes expired between the save_inprogress call and here
if event_record.status == STATUS_CONSTANTS["EXPIRED"]:
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

if event_record.status == STATUS_CONSTANTS["INPROGRESS"]:
raise IdempotencyAlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={event_record.idempotency_key}"
)

return event_record.response_json_as_dict()

def _call_lambda_handler(self) -> Any:
"""
Call the lambda handler function and update the persistence store appropriate depending on the output
Returns
-------
Any
lambda handler response
"""
try:
handler_response = self.lambda_handler(self.event, self.context)
except Exception as handler_exception:
# We need these nested blocks to preserve lambda handler exception in case the persistence store operation
# also raises an exception
try:
self.persistence_store.delete_record(event=self.event, exception=handler_exception)
except Exception as delete_exception:
raise IdempotencyPersistenceLayerError(
"Failed to delete record from idempotency store"
) from delete_exception
raise

else:
try:
self.persistence_store.save_success(event=self.event, result=handler_response)
except Exception as save_exception:
raise IdempotencyPersistenceLayerError(
"Failed to update record state to success in idempotency store"
) from save_exception

return handler_response
Empty file.
Loading

0 comments on commit 03f7dcd

Please sign in to comment.