Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-classes): replace AttributeValue in DynamoDBStreamEvent with deserialized Python values #1619

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions aws_lambda_powertools/utilities/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,10 @@ def lambda_handler(event, context: LambdaContext):
@tracer.capture_method
def record_handler(record: DynamoDBRecord):
logger.info(record.dynamodb.new_image)
payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
payload: dict = json.loads(record.dynamodb.new_image.get("item"))
# alternatively:
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image # noqa: E800
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
# changes: Dict[str, Any] = record.dynamodb.new_image # noqa: E800
# payload = change.get("Message") -> "<payload>"
...

@logger.inject_lambda_context
Expand Down
262 changes: 107 additions & 155 deletions aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py
Original file line number Diff line number Diff line change
@@ -1,169 +1,100 @@
from decimal import Clamped, Context, Decimal, Inexact, Overflow, Rounded, Underflow
from enum import Enum
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Callable, Dict, Iterator, Optional, Sequence, Set

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper

# NOTE: DynamoDB supports up to 38 digits precision
# Therefore, this ensures our Decimal follows what's stored in the table
DYNAMODB_CONTEXT = Context(
Emin=-128,
Emax=126,
prec=38,
traps=[Clamped, Overflow, Inexact, Rounded, Underflow],
)

class AttributeValueType(Enum):
Binary = "B"
BinarySet = "BS"
Boolean = "BOOL"
List = "L"
Map = "M"
Number = "N"
NumberSet = "NS"
Null = "NULL"
String = "S"
StringSet = "SS"

class TypeDeserializer:
"""
Deserializes DynamoDB types to Python types.

class AttributeValue(DictWrapper):
"""Represents the data for an attribute
It's based on boto3's [DynamoDB TypeDeserializer](https://boto3.amazonaws.com/v1/documentation/api/latest/_modules/boto3/dynamodb/types.html). # noqa: E501

Documentation:
--------------
- https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_AttributeValue.html
- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
The only notable difference is that for Binary (`B`, `BS`) values we return Python Bytes directly,
since we don't support Python 2.
"""

def __init__(self, data: Dict[str, Any]):
"""AttributeValue constructor
def deserialize(self, value: Dict) -> Any:
"""Deserialize DynamoDB data types into Python types.

Parameters
----------
data: Dict[str, Any]
Raw lambda event dict
"""
super().__init__(data)
self.dynamodb_type = list(data.keys())[0]
value: Any
DynamoDB value to be deserialized to a python type

@property
def b_value(self) -> Optional[str]:
"""An attribute of type Base64-encoded binary data object

Example:
>>> {"B": "dGhpcyB0ZXh0IGlzIGJhc2U2NC1lbmNvZGVk"}
"""
return self.get("B")
Here are the various conversions:

@property
def bs_value(self) -> Optional[List[str]]:
"""An attribute of type Array of Base64-encoded binary data objects

Example:
>>> {"BS": ["U3Vubnk=", "UmFpbnk=", "U25vd3k="]}
"""
return self.get("BS")

@property
def bool_value(self) -> Optional[bool]:
"""An attribute of type Boolean

Example:
>>> {"BOOL": True}
"""
item = self.get("BOOL")
return None if item is None else bool(item)
DynamoDB Python
-------- ------
{'NULL': True} None
{'BOOL': True/False} True/False
{'N': str(value)} str(value)
{'S': string} string
{'B': bytes} bytes
{'NS': [str(value)]} set([str(value)])
{'SS': [string]} set([string])
{'BS': [bytes]} set([bytes])
{'L': list} list
{'M': dict} dict

@property
def list_value(self) -> Optional[List["AttributeValue"]]:
"""An attribute of type Array of AttributeValue objects

Example:
>>> {"L": [ {"S": "Cookies"} , {"S": "Coffee"}, {"N": "3.14159"}]}
"""
item = self.get("L")
return None if item is None else [AttributeValue(v) for v in item]

@property
def map_value(self) -> Optional[Dict[str, "AttributeValue"]]:
"""An attribute of type String to AttributeValue object map

Example:
>>> {"M": {"Name": {"S": "Joe"}, "Age": {"N": "35"}}}
"""
return _attribute_value_dict(self._data, "M")

@property
def n_value(self) -> Optional[str]:
"""An attribute of type Number

Numbers are sent across the network to DynamoDB as strings, to maximize compatibility across languages
and libraries. However, DynamoDB treats them as number type attributes for mathematical operations.
Parameters
----------
value: Any
DynamoDB value to be deserialized to a python type

Example:
>>> {"N": "123.45"}
Returns
--------
any
Python native type converted from DynamoDB type
"""
return self.get("N")

@property
def ns_value(self) -> Optional[List[str]]:
"""An attribute of type Number Set

Example:
>>> {"NS": ["42.2", "-19", "7.5", "3.14"]}
"""
return self.get("NS")
dynamodb_type = list(value.keys())[0]
deserializer: Optional[Callable] = getattr(self, f"_deserialize_{dynamodb_type}".lower(), None)
if deserializer is None:
raise TypeError(f"Dynamodb type {dynamodb_type} is not supported")

@property
def null_value(self) -> None:
"""An attribute of type Null.
return deserializer(value[dynamodb_type])

Example:
>>> {"NULL": True}
"""
def _deserialize_null(self, value: bool) -> bool:
return None

@property
def s_value(self) -> Optional[str]:
"""An attribute of type String
def _deserialize_bool(self, value: bool) -> bool:
return value

Example:
>>> {"S": "Hello"}
"""
return self.get("S")
def _deserialize_n(self, value: str) -> Decimal:
return DYNAMODB_CONTEXT.create_decimal(value)

@property
def ss_value(self) -> Optional[List[str]]:
"""An attribute of type Array of strings
def _deserialize_s(self, value: str) -> str:
return value

Example:
>>> {"SS": ["Giraffe", "Hippo" ,"Zebra"]}
"""
return self.get("SS")
def _deserialize_b(self, value: bytes) -> bytes:
return value

@property
def get_type(self) -> AttributeValueType:
"""Get the attribute value type based on the contained data"""
return AttributeValueType(self.dynamodb_type)
def _deserialize_ns(self, value: Sequence[str]) -> Set[Decimal]:
return set(map(self._deserialize_n, value))

@property
def l_value(self) -> Optional[List["AttributeValue"]]:
"""Alias of list_value"""
return self.list_value
def _deserialize_ss(self, value: Sequence[str]) -> Set[str]:
return set(map(self._deserialize_s, value))

@property
def m_value(self) -> Optional[Dict[str, "AttributeValue"]]:
"""Alias of map_value"""
return self.map_value
def _deserialize_bs(self, value: Sequence[bytes]) -> Set[bytes]:
return set(map(self._deserialize_b, value))

@property
def get_value(self) -> Union[Optional[bool], Optional[str], Optional[List], Optional[Dict]]:
"""Get the attribute value"""
try:
return getattr(self, f"{self.dynamodb_type.lower()}_value")
except AttributeError:
raise TypeError(f"Dynamodb type {self.dynamodb_type} is not supported")
def _deserialize_l(self, value: Sequence[Dict]) -> Sequence[Any]:
return [self.deserialize(v) for v in value]


def _attribute_value_dict(attr_values: Dict[str, dict], key: str) -> Optional[Dict[str, AttributeValue]]:
"""A dict of type String to AttributeValue object map

Example:
>>> {"NewImage": {"Id": {"S": "xxx-xxx"}, "Value": {"N": "35"}}}
"""
attr_values_dict = attr_values.get(key)
return None if attr_values_dict is None else {k: AttributeValue(v) for k, v in attr_values_dict.items()}
def _deserialize_m(self, value: Dict) -> Dict:
return {k: self.deserialize(v) for k, v in value.items()}


class StreamViewType(Enum):
Expand All @@ -176,28 +107,57 @@ class StreamViewType(Enum):


class StreamRecord(DictWrapper):
_deserializer = TypeDeserializer()

def __init__(self, data: Dict[str, Any]):
"""StreamRecord constructor
Parameters
----------
data: Dict[str, Any]
Represents the dynamodb dict inside DynamoDBStreamEvent's records
"""
super().__init__(data)
self._deserializer = TypeDeserializer()

def _deserialize_dynamodb_dict(self, key: str) -> Optional[Dict[str, Any]]:
"""Deserialize DynamoDB records available in `Keys`, `NewImage`, and `OldImage`

Parameters
----------
key : str
DynamoDB key (e.g., Keys, NewImage, or OldImage)

Returns
-------
Optional[Dict[str, Any]]
Deserialized records in Python native types
"""
dynamodb_dict = self._data.get(key)
if dynamodb_dict is None:
return None

return {k: self._deserializer.deserialize(v) for k, v in dynamodb_dict.items()}

@property
def approximate_creation_date_time(self) -> Optional[int]:
"""The approximate date and time when the stream record was created, in UNIX epoch time format."""
item = self.get("ApproximateCreationDateTime")
return None if item is None else int(item)

# NOTE: This override breaks the Mapping protocol of DictWrapper, it's left here for backwards compatibility with
# a 'type: ignore' comment. See #1516 for discussion
@property
def keys(self) -> Optional[Dict[str, AttributeValue]]: # type: ignore[override]
def keys(self) -> Optional[Dict[str, Any]]: # type: ignore[override]
"""The primary key attribute(s) for the DynamoDB item that was modified."""
return _attribute_value_dict(self._data, "Keys")
return self._deserialize_dynamodb_dict("Keys")

@property
def new_image(self) -> Optional[Dict[str, AttributeValue]]:
def new_image(self) -> Optional[Dict[str, Any]]:
"""The item in the DynamoDB table as it appeared after it was modified."""
return _attribute_value_dict(self._data, "NewImage")
return self._deserialize_dynamodb_dict("NewImage")

@property
def old_image(self) -> Optional[Dict[str, AttributeValue]]:
def old_image(self) -> Optional[Dict[str, Any]]:
"""The item in the DynamoDB table as it appeared before it was modified."""
return _attribute_value_dict(self._data, "OldImage")
return self._deserialize_dynamodb_dict("OldImage")

@property
def sequence_number(self) -> Optional[str]:
Expand Down Expand Up @@ -233,7 +193,7 @@ def aws_region(self) -> Optional[str]:

@property
def dynamodb(self) -> Optional[StreamRecord]:
"""The main body of the stream record, containing all the DynamoDB-specific fields."""
"""The main body of the stream record, containing all the DynamoDB-specific dicts."""
stream_record = self.get("dynamodb")
return None if stream_record is None else StreamRecord(stream_record)

Expand Down Expand Up @@ -278,26 +238,18 @@ class DynamoDBStreamEvent(DictWrapper):

Example
-------
**Process dynamodb stream events and use get_type and get_value for handling conversions**
**Process dynamodb stream events. DynamoDB types are automatically converted to their equivalent Python values.**

from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
AttributeValueType,
AttributeValue,
)
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
for record in event.records:
key: AttributeValue = record.dynamodb.keys["id"]
if key == AttributeValueType.Number:
assert key.get_value == key.n_value
print(key.get_value)
elif key == AttributeValueType.Map:
assert key.get_value == key.map_value
print(key.get_value)
# {"N": "123.45"} => Decimal("123.45")
key: str = record.dynamodb.keys["id"]
print(key)
"""

@property
Expand Down
Loading