diff --git a/aws_lambda_powertools/utilities/data_classes/active_mq_event.py b/aws_lambda_powertools/utilities/data_classes/active_mq_event.py new file mode 100644 index 00000000000..058a6a6ecf4 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/active_mq_event.py @@ -0,0 +1,125 @@ +import base64 +import json +from typing import Any, Iterator, Optional + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class ActiveMQMessage(DictWrapper): + @property + def message_id(self) -> str: + """Unique identifier for the message""" + return self["messageID"] + + @property + def message_type(self) -> str: + return self["messageType"] + + @property + def data(self) -> str: + return self["data"] + + @property + def decoded_data(self) -> str: + """Decodes the data as a str""" + return base64.b64decode(self.data.encode()).decode() + + @property + def json_data(self) -> Any: + """Parses the data as json""" + return json.loads(self.decoded_data) + + @property + def connection_id(self) -> str: + return self["connectionId"] + + @property + def redelivered(self) -> bool: + """true if the message is being resent to the consumer""" + return self["redelivered"] + + @property + def timestamp(self) -> int: + """Time in milliseconds.""" + return self["timestamp"] + + @property + def broker_in_time(self) -> int: + """Time stamp (in milliseconds) for when the message arrived at the broker.""" + return self["brokerInTime"] + + @property + def broker_out_time(self) -> int: + """Time stamp (in milliseconds) for when the message left the broker.""" + return self["brokerOutTime"] + + @property + def destination_physicalname(self) -> str: + return self["destination"]["physicalname"] + + @property + def delivery_mode(self) -> Optional[int]: + """persistent or non-persistent delivery""" + return self.get("deliveryMode") + + @property + def correlation_id(self) -> Optional[str]: + """User defined correlation id""" + return self.get("correlationID") + + @property + def reply_to(self) -> Optional[str]: + """User defined reply to""" + return self.get("replyTo") + + @property + def get_type(self) -> Optional[str]: + """User defined message type""" + return self.get("type") + + @property + def expiration(self) -> Optional[int]: + """Expiration attribute whose value is given in milliseconds""" + return self.get("expiration") + + @property + def priority(self) -> Optional[int]: + """ + JMS defines a ten-level priority value, with 0 as the lowest priority and 9 + as the highest. In addition, clients should consider priorities 0-4 as + gradations of normal priority and priorities 5-9 as gradations of expedited + priority. + + JMS does not require that a provider strictly implement priority ordering + of messages; however, it should do its best to deliver expedited messages + ahead of normal messages. + """ + return self.get("priority") + + +class ActiveMQEvent(DictWrapper): + """Represents an Active MQ event sent to Lambda + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html + - https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/ + """ + + @property + def event_source(self) -> str: + return self["eventSource"] + + @property + def event_source_arn(self) -> str: + """The Amazon Resource Name (ARN) of the event source""" + return self["eventSourceArn"] + + @property + def messages(self) -> Iterator[ActiveMQMessage]: + for record in self["messages"]: + yield ActiveMQMessage(record) + + @property + def message(self) -> ActiveMQMessage: + return next(self.messages) diff --git a/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py b/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py new file mode 100644 index 00000000000..7676e6ff9b5 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py @@ -0,0 +1,121 @@ +import base64 +import json +from typing import Any, Dict, List + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class BasicProperties(DictWrapper): + @property + def content_type(self) -> str: + return self["contentType"] + + @property + def content_encoding(self) -> str: + return self["contentEncoding"] + + @property + def headers(self) -> Dict[str, Any]: + return self["headers"] + + @property + def delivery_mode(self) -> int: + return self["deliveryMode"] + + @property + def priority(self) -> int: + return self["priority"] + + @property + def correlation_id(self) -> str: + return self["correlationId"] + + @property + def reply_to(self) -> str: + return self["replyTo"] + + @property + def expiration(self) -> str: + return self["expiration"] + + @property + def message_id(self) -> str: + return self["messageId"] + + @property + def timestamp(self) -> str: + return self["timestamp"] + + @property + def get_type(self) -> str: + return self["type"] + + @property + def user_id(self) -> str: + return self["userId"] + + @property + def app_id(self) -> str: + return self["appId"] + + @property + def cluster_id(self) -> str: + return self["clusterId"] + + @property + def body_size(self) -> int: + return self["bodySize"] + + +class RabbitMessage(DictWrapper): + @property + def basic_properties(self) -> BasicProperties: + return BasicProperties(self["basicProperties"]) + + @property + def redelivered(self) -> bool: + return self["redelivered"] + + @property + def data(self) -> str: + return self["data"] + + @property + def decoded_data(self) -> str: + """Decodes the data as a str""" + return base64.b64decode(self.data.encode()).decode() + + @property + def json_data(self) -> Any: + """Parses the data as json""" + return json.loads(self.decoded_data) + + +class RabbitMQEvent(DictWrapper): + """Represents a Rabbit MQ event sent to Lambda + + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html + - https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/ + """ + + def __init__(self, data: Dict[str, Any]): + super().__init__(data) + self._rmq_messages_by_queue = { + key: [RabbitMessage(message) for message in messages] + for key, messages in self["rmqMessagesByQueue"].items() + } + + @property + def event_source(self) -> str: + return self["eventSource"] + + @property + def event_source_arn(self) -> str: + """The Amazon Resource Name (ARN) of the event source""" + return self["eventSourceArn"] + + @property + def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]: + return self._rmq_messages_by_queue diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index e05193c7702..cbe874d4b94 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -58,6 +58,7 @@ Same example as above, but using the `event_source` decorator Event Source | Data_class ------------------------------------------------- | --------------------------------------------------------------------------------- +[Active MQ](#active-mq) | `ActiveMQEvent` [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` @@ -72,6 +73,7 @@ Event Source | Data_class [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` [EventBridge](#eventbridge) | `EventBridgeEvent` [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` +[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` [S3](#s3) | `S3Event` [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` [SES](#ses) | `SESEvent` @@ -82,6 +84,31 @@ Event Source | Data_class The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of documentation inherently (via autocompletion, types and docstrings). +### Active MQ + +It is used for [Active MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see +the [AWS blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/){target="_blank"} +for more details. + +=== "app.py" + + ```python hl_lines="4-5 9-10" + from typing import Dict + + from aws_lambda_powertools import Logger + from aws_lambda_powertools.utilities.data_classes import event_source + from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent + + logger = Logger() + + @event_source(data_class=ActiveMQEvent) + def lambda_handler(event: ActiveMQEvent, context): + for message in event.messages: + logger.debug(f"MessageID: {message.message_id}") + data: Dict = message.json_data + logger.debug("Process json in base64 encoded data str", data) + ``` + ### API Gateway Authorizer > New in 1.20.0 @@ -810,6 +837,33 @@ or plain text, depending on the original payload. do_something_with(data) ``` +### Rabbit MQ + +It is used for [Rabbit MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see +the [blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/){target="_blank"} +for more details. + +=== "app.py" + + ```python hl_lines="4-5 9-10" + from typing import Dict + + from aws_lambda_powertools import Logger + from aws_lambda_powertools.utilities.data_classes import event_source + from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent + + logger = Logger() + + @event_source(data_class=RabbitMQEvent) + def lambda_handler(event: RabbitMQEvent, context): + for queue_name, messages in event.rmq_messages_by_queue.items(): + logger.debug(f"Messages for queue: {queue_name}") + for message in messages: + logger.debug(f"MessageID: {message.basic_properties.message_id}") + data: Dict = message.json_data + logger.debug("Process json in base64 encoded data str", data) + ``` + ### S3 === "app.py" diff --git a/tests/events/activeMQEvent.json b/tests/events/activeMQEvent.json new file mode 100644 index 00000000000..290ada184c9 --- /dev/null +++ b/tests/events/activeMQEvent.json @@ -0,0 +1,45 @@ +{ + "eventSource": "aws:amq", + "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", + "messages": [ + { + "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", + "messageType": "jms/text-message", + "data": "QUJDOkFBQUE=", + "connectionId": "myJMSCoID", + "redelivered": false, + "destination": { + "physicalname": "testQueue" + }, + "timestamp": 1598827811958, + "brokerInTime": 1598827811958, + "brokerOutTime": 1598827811959 + }, + { + "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", + "messageType": "jms/text-message", + "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==", + "connectionId": "myJMSCoID2", + "redelivered": false, + "destination": { + "physicalname": "testQueue" + }, + "timestamp": 1598827811958, + "brokerInTime": 1598827811958, + "brokerOutTime": 1598827811959 + }, + { + "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", + "messageType": "jms/bytes-message", + "data": "3DTOOW7crj51prgVLQaGQ82S48k=", + "connectionId": "myJMSCoID1", + "persistent": false, + "destination": { + "physicalname": "testQueue" + }, + "timestamp": 1598827811958, + "brokerInTime": 1598827811958, + "brokerOutTime": 1598827811959 + } + ] +} diff --git a/tests/events/rabbitMQEvent.json b/tests/events/rabbitMQEvent.json new file mode 100644 index 00000000000..e4259555a8b --- /dev/null +++ b/tests/events/rabbitMQEvent.json @@ -0,0 +1,51 @@ +{ + "eventSource": "aws:rmq", + "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", + "rmqMessagesByQueue": { + "pizzaQueue::/": [ + { + "basicProperties": { + "contentType": "text/plain", + "contentEncoding": null, + "headers": { + "header1": { + "bytes": [ + 118, + 97, + 108, + 117, + 101, + 49 + ] + }, + "header2": { + "bytes": [ + 118, + 97, + 108, + 117, + 101, + 50 + ] + }, + "numberInHeader": 10 + }, + "deliveryMode": 1, + "priority": 34, + "correlationId": null, + "replyTo": null, + "expiration": "60000", + "messageId": null, + "timestamp": "Jan 1, 1970, 12:33:41 AM", + "type": null, + "userId": "AIDACKCEVSQ6C2EXAMPLE", + "appId": null, + "clusterId": null, + "bodySize": 80 + }, + "redelivered": false, + "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" + } + ] + } +} diff --git a/tests/functional/data_classes/test_amazon_mq.py b/tests/functional/data_classes/test_amazon_mq.py new file mode 100644 index 00000000000..0f4f5079565 --- /dev/null +++ b/tests/functional/data_classes/test_amazon_mq.py @@ -0,0 +1,69 @@ +from typing import Dict + +from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent, ActiveMQMessage +from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import BasicProperties, RabbitMessage, RabbitMQEvent +from tests.functional.utils import load_event + + +def test_active_mq_event(): + event = ActiveMQEvent(load_event("activeMQEvent.json")) + + assert event.event_source == "aws:amq" + assert event.event_source_arn is not None + assert len(list(event.messages)) == 3 + + message = event.message + assert isinstance(message, ActiveMQMessage) + assert message.message_id is not None + assert message.message_type is not None + assert message.data is not None + assert message.decoded_data is not None + assert message.connection_id is not None + assert message.redelivered is False + assert message.timestamp is not None + assert message.broker_in_time is not None + assert message.broker_out_time is not None + assert message.destination_physicalname is not None + assert message.delivery_mode is None + assert message.correlation_id is None + assert message.reply_to is None + assert message.get_type is None + assert message.expiration is None + assert message.priority is None + + messages = list(event.messages) + message = messages[1] + assert message.json_data["timeout"] == 0 + + +def test_rabbit_mq_event(): + event = RabbitMQEvent(load_event("rabbitMQEvent.json")) + + assert event.event_source == "aws:rmq" + assert event.event_source_arn is not None + + message = event.rmq_messages_by_queue["pizzaQueue::/"][0] + assert message.redelivered is False + assert message.data is not None + assert message.decoded_data is not None + assert message.json_data["timeout"] == 0 + + assert isinstance(message, RabbitMessage) + properties = message.basic_properties + assert isinstance(properties, BasicProperties) + assert properties.content_type == "text/plain" + assert properties.content_encoding is None + assert isinstance(properties.headers, Dict) + assert properties.headers["header1"] is not None + assert properties.delivery_mode == 1 + assert properties.priority == 34 + assert properties.correlation_id is None + assert properties.reply_to is None + assert properties.expiration == "60000" + assert properties.message_id is None + assert properties.timestamp is not None + assert properties.get_type is None + assert properties.user_id is not None + assert properties.app_id is None + assert properties.cluster_id is None + assert properties.body_size == 80