Skip to content

Commit

Permalink
feat(idempotency): Clean up on lambda timeout
Browse files Browse the repository at this point in the history
Changes:
- Initial draft on an option to clean up on function timeout

close aws-powertools#1038
  • Loading branch information
michaelbrewer committed May 1, 2022
1 parent 8ca082f commit a67dd59
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 21 deletions.
10 changes: 9 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(data=self.data)
self.persistence_store.save_inprogress(
data=self.data,
function_timeout=self._get_remaining_time_in_seconds(),
)
except IdempotencyKeyError:
raise
except IdempotencyItemAlreadyExistsError:
Expand All @@ -113,6 +116,11 @@ def _process_idempotency(self):

return self._get_function_response()

def _get_remaining_time_in_seconds(self) -> Optional[int]:
if self.fn_args and len(self.fn_args) == 2 and getattr(self.fn_args[1], "get_remaining_time_in_millis", None):
return self.fn_args[1].get_remaining_time_in_millis() / 1000
return None

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
Expand Down
4 changes: 4 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ def __init__(
jmespath_options: Optional[Dict] = None,
raise_on_no_idempotency_key: bool = False,
expires_after_seconds: int = 60 * 60, # 1 hour default
function_timeout_clean_up: bool = False,
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
Expand All @@ -26,6 +27,8 @@ def __init__(
Raise exception if no idempotency key was found in the request, by default False
expires_after_seconds: int
The number of seconds to wait before a record is expired
function_timeout_clean_up: bool
Whether to clean up in progress record after a function timeouts
use_local_cache: bool, optional
Whether to locally cache idempotency results, by default False
local_cache_max_items: int, optional
Expand All @@ -38,6 +41,7 @@ def __init__(
self.jmespath_options = jmespath_options
self.raise_on_no_idempotency_key = raise_on_no_idempotency_key
self.expires_after_seconds = expires_after_seconds
self.function_timeout_clean_up = function_timeout_clean_up
self.use_local_cache = use_local_cache
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
28 changes: 26 additions & 2 deletions aws_lambda_powertools/utilities/idempotency/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
idempotency_key,
status: str = "",
expiry_timestamp: Optional[int] = None,
function_timeout: Optional[int] = None,
response_data: Optional[str] = "",
payload_hash: Optional[str] = None,
) -> None:
Expand All @@ -61,6 +62,7 @@ def __init__(
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.function_timeout = function_timeout
self._status = status
self.response_data = response_data

Expand Down Expand Up @@ -120,6 +122,7 @@ def __init__(self):
self.validation_key_jmespath = None
self.raise_on_no_idempotency_key = False
self.expires_after_seconds: int = 60 * 60 # 1 hour default
self.function_timeout_clean_up = False
self.use_local_cache = False
self.hash_function = None

Expand Down Expand Up @@ -152,6 +155,7 @@ def configure(self, config: IdempotencyConfig, function_name: Optional[str] = No
self.payload_validation_enabled = True
self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
self.expires_after_seconds = config.expires_after_seconds
self.function_timeout_clean_up = config.function_timeout_clean_up
self.use_local_cache = config.use_local_cache
if self.use_local_cache:
self._cache = LRUDict(max_items=config.local_cache_max_items)
Expand Down Expand Up @@ -257,9 +261,21 @@ def _get_expiry_timestamp(self) -> int:
int
unix timestamp of expiry date for idempotency record
"""
return self._get_timestamp_after_seconds(self.expires_after_seconds)

@staticmethod
def _get_timestamp_after_seconds(seconds: int) -> int:
"""
Returns
-------
int
unix timestamp after the specified seconds
"""
now = datetime.datetime.now()
period = datetime.timedelta(seconds=self.expires_after_seconds)
period = datetime.timedelta(seconds=seconds)
return int((now + period).timestamp())

def _save_to_cache(self, data_record: DataRecord):
Expand Down Expand Up @@ -317,6 +333,7 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:
idempotency_key=self._get_hashed_idempotency_key(data=data),
status=STATUS_CONSTANTS["COMPLETED"],
expiry_timestamp=self._get_expiry_timestamp(),
function_timeout=None,
response_data=response_data,
payload_hash=self._get_hashed_payload(data=data),
)
Expand All @@ -328,19 +345,26 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:

self._save_to_cache(data_record=data_record)

def save_inprogress(self, data: Dict[str, Any]) -> None:
def save_inprogress(self, data: Dict[str, Any], function_timeout: Optional[int] = None) -> None:
"""
Save record of function's execution being in progress
Parameters
----------
data: Dict[str, Any]
Payload
function_timeout: int, optional
"""
function_timeout = (
self._get_timestamp_after_seconds(function_timeout)
if function_timeout and self.function_timeout_clean_up
else None
)
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(data=data),
status=STATUS_CONSTANTS["INPROGRESS"],
expiry_timestamp=self._get_expiry_timestamp(),
function_timeout=function_timeout,
payload_hash=self._get_hashed_payload(data=data),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
static_pk_value: Optional[str] = None,
sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
function_timeout_attr: str = "function_timeout",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(
self.static_pk_value = static_pk_value
self.sort_key_attr = sort_key_attr
self.expiry_attr = expiry_attr
self.function_timeout_attr = function_timeout_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
Expand Down Expand Up @@ -150,6 +152,7 @@ def _put_record(self, data_record: DataRecord) -> None:
item = {
**self._get_key(data_record.idempotency_key),
self.expiry_attr: data_record.expiry_timestamp,
self.function_timeout_attr: data_record.function_timeout,
self.status_attr: data_record.status,
}

Expand All @@ -161,8 +164,12 @@ def _put_record(self, data_record: DataRecord) -> None:
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")
self.table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(#id) OR #now < :now",
ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr},
ConditionExpression="attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now",
ExpressionAttributeNames={
"#id": self.key_attr,
"#now": self.expiry_attr,
"#function_timeout": self.function_timeout_attr,
},
ExpressionAttributeValues={":now": int(now.timestamp())},
)
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
Expand Down
36 changes: 23 additions & 13 deletions tests/functional/idempotency/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime
import json
from collections import namedtuple
from decimal import Decimal
from unittest import mock

Expand Down Expand Up @@ -32,14 +31,19 @@ def lambda_apigw_event():

@pytest.fixture
def lambda_context():
lambda_context = {
"function_name": "test-func",
"memory_limit_in_mb": 128,
"invoked_function_arn": "arn:aws:lambda:eu-west-1:809313241234:function:test-func",
"aws_request_id": "52fdfc07-2182-154f-163f-5f0f9a621d72",
}
class LambdaContext:
def __init__(self):
self.function_name = "test-func"
self.memory_limit_in_mb = 128
self.invoked_function_arn = "arn:aws:lambda:eu-west-1:809313241234:function:test-func"
self.aws_request_id = "52fdfc07-2182-154f-163f-5f0f9a621d72"

@staticmethod
def get_remaining_time_in_millis() -> int:
"""Returns the number of milliseconds left before the execution times out."""
return 0

return namedtuple("LambdaContext", lambda_context.keys())(*lambda_context.values())
return LambdaContext()


@pytest.fixture
Expand Down Expand Up @@ -117,25 +121,31 @@ def expected_params_update_item_with_validation(
@pytest.fixture
def expected_params_put_item(hashed_idempotency_key):
return {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {"expiration": stub.ANY, "id": hashed_idempotency_key, "status": "INPROGRESS"},
"Item": {
"expiration": stub.ANY,
"id": hashed_idempotency_key,
"status": "INPROGRESS",
"function_timeout": None,
},
"TableName": "TEST_TABLE",
}


@pytest.fixture
def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_validation_key):
return {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {
"expiration": stub.ANY,
"id": hashed_idempotency_key,
"status": "INPROGRESS",
"validation": hashed_validation_key,
"function_timeout": None,
},
"TableName": "TEST_TABLE",
}
Expand Down
11 changes: 8 additions & 3 deletions tests/functional/idempotency/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ def build_idempotency_put_item_stub(
) -> Dict:
idempotency_key_hash = f"{function_name}.{handler_name}#{hash_idempotency_key(data)}"
return {
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration"},
"ConditionExpression": "attribute_not_exists(#id) OR #now < :now OR #function_timeout < :now",
"ExpressionAttributeNames": {"#id": "id", "#now": "expiration", "#function_timeout": "function_timeout"},
"ExpressionAttributeValues": {":now": stub.ANY},
"Item": {"expiration": stub.ANY, "id": idempotency_key_hash, "status": "INPROGRESS"},
"Item": {
"expiration": stub.ANY,
"id": idempotency_key_hash,
"status": "INPROGRESS",
"function_timeout": None,
},
"TableName": "TEST_TABLE",
}

Expand Down

0 comments on commit a67dd59

Please sign in to comment.