From b53877251aa1627252f01373bd41682c764ad6aa Mon Sep 17 00:00:00 2001 From: Heitor Lessa Date: Tue, 19 Jan 2021 15:01:42 +0100 Subject: [PATCH] feat: add support for SNS->SQS protocol (#272) * feat: add support for SNS->SQS protocol * docs: add SnsEnvelope, SnsSqsEnvelope; fix headers Signed-off-by: heitorlessa --- .../utilities/parser/envelopes/__init__.py | 3 +- .../utilities/parser/envelopes/sns.py | 37 ++++++++++++++++++- .../utilities/parser/models/sns.py | 12 +++++- docs/content/utilities/parser.mdx | 16 +++++--- tests/events/snsSqsEvent.json | 20 ++++++++++ tests/functional/parser/test_sns.py | 12 ++++++ 6 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 tests/events/snsSqsEvent.json diff --git a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py index d9d820aede0..10c70272c7d 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/__init__.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/__init__.py @@ -3,7 +3,7 @@ from .dynamodb import DynamoDBStreamEnvelope from .event_bridge import EventBridgeEnvelope from .kinesis import KinesisDataStreamEnvelope -from .sns import SnsEnvelope +from .sns import SnsEnvelope, SnsSqsEnvelope from .sqs import SqsEnvelope __all__ = [ @@ -12,6 +12,7 @@ "EventBridgeEnvelope", "KinesisDataStreamEnvelope", "SnsEnvelope", + "SnsSqsEnvelope", "SqsEnvelope", "BaseEnvelope", ] diff --git a/aws_lambda_powertools/utilities/parser/envelopes/sns.py b/aws_lambda_powertools/utilities/parser/envelopes/sns.py index d4a78199d07..a194ea55895 100644 --- a/aws_lambda_powertools/utilities/parser/envelopes/sns.py +++ b/aws_lambda_powertools/utilities/parser/envelopes/sns.py @@ -1,7 +1,7 @@ import logging from typing import Any, Dict, List, Optional, Union -from ..models import SnsModel +from ..models import SnsModel, SnsNotificationModel, SqsModel from ..types import Model from .base import BaseEnvelope @@ -37,3 +37,38 @@ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> Lis parsed_envelope = SnsModel.parse_obj(data) logger.debug(f"Parsing SNS records in `body` with {model}") return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records] + + +class SnsSqsEnvelope(BaseEnvelope): + """SNS plus SQS Envelope to extract array of Records + + Published messages from SNS to SQS has a slightly different payload. + Since SNS payload is marshalled into `Record` key in SQS, we have to: + + 1. Parse SQS schema with incoming data + 2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record + 3. Finally, parse provided model against payload extracted + """ + + def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]: + """Parses records found with model provided + + Parameters + ---------- + data : Dict + Lambda event to be parsed + model : Model + Data model provided to parse after extracting data using envelope + + Returns + ------- + List + List of records parsed with model provided + """ + logger.debug(f"Parsing incoming data with SQS model {SqsModel}") + parsed_envelope = SqsModel.parse_obj(data) + output = [] + for record in parsed_envelope.Records: + sns_notification = SnsNotificationModel.parse_raw(record.body) + output.append(self._parse(data=sns_notification.Message, model=model)) + return output diff --git a/aws_lambda_powertools/utilities/parser/models/sns.py b/aws_lambda_powertools/utilities/parser/models/sns.py index 4462bc4f130..4da8b1b0977 100644 --- a/aws_lambda_powertools/utilities/parser/models/sns.py +++ b/aws_lambda_powertools/utilities/parser/models/sns.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, root_validator from pydantic.networks import HttpUrl from typing_extensions import Literal @@ -16,7 +16,7 @@ class SnsNotificationModel(BaseModel): TopicArn: str UnsubscribeUrl: HttpUrl Type: Literal["Notification"] - MessageAttributes: Dict[str, SnsMsgAttributeModel] + MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]] Message: str MessageId: str SigningCertUrl: HttpUrl @@ -24,6 +24,14 @@ class SnsNotificationModel(BaseModel): Timestamp: datetime SignatureVersion: str + @root_validator(pre=True) + def check_sqs_protocol(cls, values): + sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL") + if any(key in sqs_rewritten_keys for key in values): + values["UnsubscribeUrl"] = values.pop("UnsubscribeURL") + values["SigningCertUrl"] = values.pop("SigningCertURL") + return values + class SnsRecordModel(BaseModel): EventSource: Literal["aws:sns"] diff --git a/docs/content/utilities/parser.mdx b/docs/content/utilities/parser.mdx index 5ccd91d52ec..b0164abb399 100644 --- a/docs/content/utilities/parser.mdx +++ b/docs/content/utilities/parser.mdx @@ -147,7 +147,7 @@ def my_function(): } ``` -## Extending built-in models +## Built-in models Parser comes with the following built-in models: @@ -163,6 +163,8 @@ Model name | Description **SesModel** | Lambda Event Source payload for Amazon Simple Email Service **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service +### extending built-in models + You can extend them to include your own models, and yet have all other known fields parsed along the way. **EventBridge example** @@ -289,7 +291,7 @@ def handler(event: UserModel, context: LambdaContext): 4. Parser then parsed the `detail` key using `UserModel` -### Built-in envelopes +### built-in envelopes Parser comes with the following built-in envelopes, where `Model` in the return section is your given model. @@ -300,8 +302,10 @@ Envelope name | Behaviour | Return **SqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it.
2. Parses records in `message` key using your model and return them in a list. | `List[Model]` **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it.
2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` +**SnsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses records in `body` key using your model and return them in a list. | `List[Model]` +**SnsSqsEnvelope** | 1. Parses data using `SqsModel`.
2. Parses SNS records in `body` key using `SnsNotificationModel`.
3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` -### Bringing your own envelope +### bringing your own envelope You can create your own Envelope model and logic by inheriting from `BaseEnvelope`, and implementing the `parse` method. @@ -366,7 +370,7 @@ class EventBridgeEnvelope(BaseEnvelope): # highlight-line 3. Then, we parsed the incoming data with our envelope to confirm it matches EventBridge's structure defined in `EventBridgeModel` 4. Lastly, we call `_parse` from `BaseEnvelope` to parse the data in our envelope (.detail) using the customer model -### Data model validation +## Data model validation This is radically different from the Validator utility which validates events against JSON Schema. @@ -384,7 +388,7 @@ Keep the following in mind regardless of which decorator you end up using it: * You must raise either `ValueError`, `TypeError`, or `AssertionError` when value is not compliant * You must return the value(s) itself if compliant -#### Validating fields +### validating fields Quick validation to verify whether the field `message` has the value of `hello world`. @@ -429,7 +433,7 @@ class HelloWorldModel(BaseModel): parse(model=HelloWorldModel, event={"message": "hello universe", "sender": "universe"}) ``` -#### Validating entire model +### validating entire model `root_validator` can help when you have a complex validation mechanism. For example finding whether data has been omitted, comparing field values, etc. diff --git a/tests/events/snsSqsEvent.json b/tests/events/snsSqsEvent.json new file mode 100644 index 00000000000..ee440fc2962 --- /dev/null +++ b/tests/events/snsSqsEvent.json @@ -0,0 +1,20 @@ +{ + "Records": [ + { + "messageId": "79406a00-bf15-46ca-978c-22c3613fcb30", + "receiptHandle": "AQEB3fkqlBqq239bMCAHIr5mZkxJYKtxsTTy1lMImmpY7zqpQdfcAE8zFiuRh7X5ciROy24taT2rRXfuJFN/yEUVcQ6d5CIOCEK4htmRJJOHIyGdZPAm2NUUG5nNn2aEzgfzVvrkPBsrCbr7XTzK5s6eUZNH/Nn9AJtHKHpzweRK34Bon9OU/mvyIT7EJbwHPsdhL14NrCp8pLWBiIhkaJkG2G6gPO89dwHtGVUARJL+zP70AuIu/f7QgmPtY2eeE4AVbcUT1qaIlSGHUXxoHq/VMHLd/c4zWl0EXQOo/90DbyCUMejTIKL7N15YfkHoQDHprvMiAr9S75cdMiNOduiHzZLg/qVcv4kxsksKLFMKjwlzmYuQYy2KslVGwoHMd4PD", + "body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"d88d4479-6ec0-54fe-b63f-1cf9df4bb16e\",\n \"TopicArn\" : \"arn:aws:sns:eu-west-1:231436140809:powertools265\",\n \"Message\" : \"{\\\"message\\\": \\\"hello world\\\", \\\"username\\\": \\\"lessa\\\"}\",\n \"Timestamp\" : \"2021-01-19T10:07:07.287Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"tEo2i6Lw6/Dr7Jdlulh0sXgnkF0idd3hqs8QZCorQpzkIWVOuu583NT0Gv0epuZD1Bo+tex6NgP5p6415yNVujGHJKnkrA9ztzXaVgFiol8rf8AFGQbmb7RsM9BqATQUJeg9nCTe0jksmWXmjxEFr8XKyyRuQBwSlRTngAvOw8jUnCe1vyYD5xPec1xpfOEGLi5BqSog+6tBtsry3oAtcENX8SV1tVuMpp6D+UrrU8xNT/5D70uRDppkPE3vq+t7rR0fVSdQRdUV9KmQD2bflA1Dyb2y37EzwJOMHDDQ82aOhj/JmPxvEAlV8RkZl6J0HIveraRy9wbNLbI7jpiOCw==\",\n \"SigningCertURL\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:231436140809:powertools265:15189ad7-870e-40e5-a7dd-a48898cd9f86\"\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1611050827340", + "SenderId": "AIDAISMY7JYY5F7RTT6AO", + "ApproximateFirstReceiveTimestamp": "1611050827344" + }, + "messageAttributes": {}, + "md5OfBody": "8910bdaaf9a30a607f7891037d4af0b0", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-west-1:231436140809:powertools265", + "awsRegion": "eu-west-1" + } + ] +} diff --git a/tests/functional/parser/test_sns.py b/tests/functional/parser/test_sns.py index 0dd1e6b506f..015af3693fa 100644 --- a/tests/functional/parser/test_sns.py +++ b/tests/functional/parser/test_sns.py @@ -91,3 +91,15 @@ def handle_sns_no_envelope(event: MyAdvancedSnsBusiness, _: LambdaContext): def test_handle_sns_trigger_event_no_envelope(): event_dict = load_event("snsEvent.json") handle_sns_no_envelope(event_dict, LambdaContext()) + + +@event_parser(model=MySnsBusiness, envelope=envelopes.SnsSqsEnvelope) +def handle_sns_sqs_json_body(event: List[MySnsBusiness], _: LambdaContext): + assert len(event) == 1 + assert event[0].message == "hello world" + assert event[0].username == "lessa" + + +def test_handle_sns_sqs_trigger_event_json_body(): # noqa: F811 + event_dict = load_event("snsSqsEvent.json") + handle_sns_sqs_json_body(event_dict, LambdaContext())