Skip to content

Commit

Permalink
feat: add support for SNS->SQS protocol (#272)
Browse files Browse the repository at this point in the history
* feat: add support for SNS->SQS protocol

* docs: add SnsEnvelope, SnsSqsEnvelope; fix headers

Signed-off-by: heitorlessa <[email protected]>
  • Loading branch information
heitorlessa authored Jan 19, 2021
1 parent 722b4a3 commit b538772
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 10 deletions.
3 changes: 2 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope

__all__ = [
Expand All @@ -12,6 +12,7 @@
"EventBridgeEnvelope",
"KinesisDataStreamEnvelope",
"SnsEnvelope",
"SnsSqsEnvelope",
"SqsEnvelope",
"BaseEnvelope",
]
37 changes: 36 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/sns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Any, Dict, List, Optional, Union

from ..models import SnsModel
from ..models import SnsModel, SnsNotificationModel, SqsModel
from ..types import Model
from .base import BaseEnvelope

Expand Down Expand Up @@ -37,3 +37,38 @@ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> Lis
parsed_envelope = SnsModel.parse_obj(data)
logger.debug(f"Parsing SNS records in `body` with {model}")
return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]


class SnsSqsEnvelope(BaseEnvelope):
"""SNS plus SQS Envelope to extract array of Records
Published messages from SNS to SQS has a slightly different payload.
Since SNS payload is marshalled into `Record` key in SQS, we have to:
1. Parse SQS schema with incoming data
2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
3. Finally, parse provided model against payload extracted
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Model) -> List[Optional[Model]]:
"""Parses records found with model provided
Parameters
----------
data : Dict
Lambda event to be parsed
model : Model
Data model provided to parse after extracting data using envelope
Returns
-------
List
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with SQS model {SqsModel}")
parsed_envelope = SqsModel.parse_obj(data)
output = []
for record in parsed_envelope.Records:
sns_notification = SnsNotificationModel.parse_raw(record.body)
output.append(self._parse(data=sns_notification.Message, model=model))
return output
12 changes: 10 additions & 2 deletions aws_lambda_powertools/utilities/parser/models/sns.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from typing import Dict, List, Optional

from pydantic import BaseModel
from pydantic import BaseModel, root_validator
from pydantic.networks import HttpUrl
from typing_extensions import Literal

Expand All @@ -16,14 +16,22 @@ class SnsNotificationModel(BaseModel):
TopicArn: str
UnsubscribeUrl: HttpUrl
Type: Literal["Notification"]
MessageAttributes: Dict[str, SnsMsgAttributeModel]
MessageAttributes: Optional[Dict[str, SnsMsgAttributeModel]]
Message: str
MessageId: str
SigningCertUrl: HttpUrl
Signature: str
Timestamp: datetime
SignatureVersion: str

@root_validator(pre=True)
def check_sqs_protocol(cls, values):
sqs_rewritten_keys = ("UnsubscribeURL", "SigningCertURL")
if any(key in sqs_rewritten_keys for key in values):
values["UnsubscribeUrl"] = values.pop("UnsubscribeURL")
values["SigningCertUrl"] = values.pop("SigningCertURL")
return values


class SnsRecordModel(BaseModel):
EventSource: Literal["aws:sns"]
Expand Down
16 changes: 10 additions & 6 deletions docs/content/utilities/parser.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def my_function():
}
```

## Extending built-in models
## Built-in models

Parser comes with the following built-in models:

Expand All @@ -163,6 +163,8 @@ Model name | Description
**SesModel** | Lambda Event Source payload for Amazon Simple Email Service
**SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service

### extending built-in models

You can extend them to include your own models, and yet have all other known fields parsed along the way.

**EventBridge example**
Expand Down Expand Up @@ -289,7 +291,7 @@ def handler(event: UserModel, context: LambdaContext):
4. Parser then parsed the `detail` key using `UserModel`


### Built-in envelopes
### built-in envelopes

Parser comes with the following built-in envelopes, where `Model` in the return section is your given model.

Expand All @@ -300,8 +302,10 @@ Envelope name | Behaviour | Return
**SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]`
**KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]`
**SnsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]`
**SnsSqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses SNS records in `body` key using `SnsNotificationModel`. <br/> 3. Parses data in `Message` key using your model and return them in a list. | `List[Model]`

### Bringing your own envelope
### bringing your own envelope

You can create your own Envelope model and logic by inheriting from `BaseEnvelope`, and implementing the `parse` method.

Expand Down Expand Up @@ -366,7 +370,7 @@ class EventBridgeEnvelope(BaseEnvelope): # highlight-line
3. Then, we parsed the incoming data with our envelope to confirm it matches EventBridge's structure defined in `EventBridgeModel`
4. Lastly, we call `_parse` from `BaseEnvelope` to parse the data in our envelope (.detail) using the customer model

### Data model validation
## Data model validation

<Note type="warning">
This is radically different from the <strong>Validator utility</strong> which validates events against JSON Schema.
Expand All @@ -384,7 +388,7 @@ Keep the following in mind regardless of which decorator you end up using it:
* You must raise either `ValueError`, `TypeError`, or `AssertionError` when value is not compliant
* You must return the value(s) itself if compliant

#### Validating fields
### validating fields

Quick validation to verify whether the field `message` has the value of `hello world`.

Expand Down Expand Up @@ -429,7 +433,7 @@ class HelloWorldModel(BaseModel):
parse(model=HelloWorldModel, event={"message": "hello universe", "sender": "universe"})
```

#### Validating entire model
### validating entire model

`root_validator` can help when you have a complex validation mechanism. For example finding whether data has been omitted, comparing field values, etc.

Expand Down
20 changes: 20 additions & 0 deletions tests/events/snsSqsEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"Records": [
{
"messageId": "79406a00-bf15-46ca-978c-22c3613fcb30",
"receiptHandle": "AQEB3fkqlBqq239bMCAHIr5mZkxJYKtxsTTy1lMImmpY7zqpQdfcAE8zFiuRh7X5ciROy24taT2rRXfuJFN/yEUVcQ6d5CIOCEK4htmRJJOHIyGdZPAm2NUUG5nNn2aEzgfzVvrkPBsrCbr7XTzK5s6eUZNH/Nn9AJtHKHpzweRK34Bon9OU/mvyIT7EJbwHPsdhL14NrCp8pLWBiIhkaJkG2G6gPO89dwHtGVUARJL+zP70AuIu/f7QgmPtY2eeE4AVbcUT1qaIlSGHUXxoHq/VMHLd/c4zWl0EXQOo/90DbyCUMejTIKL7N15YfkHoQDHprvMiAr9S75cdMiNOduiHzZLg/qVcv4kxsksKLFMKjwlzmYuQYy2KslVGwoHMd4PD",
"body": "{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"d88d4479-6ec0-54fe-b63f-1cf9df4bb16e\",\n \"TopicArn\" : \"arn:aws:sns:eu-west-1:231436140809:powertools265\",\n \"Message\" : \"{\\\"message\\\": \\\"hello world\\\", \\\"username\\\": \\\"lessa\\\"}\",\n \"Timestamp\" : \"2021-01-19T10:07:07.287Z\",\n \"SignatureVersion\" : \"1\",\n \"Signature\" : \"tEo2i6Lw6/Dr7Jdlulh0sXgnkF0idd3hqs8QZCorQpzkIWVOuu583NT0Gv0epuZD1Bo+tex6NgP5p6415yNVujGHJKnkrA9ztzXaVgFiol8rf8AFGQbmb7RsM9BqATQUJeg9nCTe0jksmWXmjxEFr8XKyyRuQBwSlRTngAvOw8jUnCe1vyYD5xPec1xpfOEGLi5BqSog+6tBtsry3oAtcENX8SV1tVuMpp6D+UrrU8xNT/5D70uRDppkPE3vq+t7rR0fVSdQRdUV9KmQD2bflA1Dyb2y37EzwJOMHDDQ82aOhj/JmPxvEAlV8RkZl6J0HIveraRy9wbNLbI7jpiOCw==\",\n \"SigningCertURL\" : \"https://sns.eu-west-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem\",\n \"UnsubscribeURL\" : \"https://sns.eu-west-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-west-1:231436140809:powertools265:15189ad7-870e-40e5-a7dd-a48898cd9f86\"\n}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1611050827340",
"SenderId": "AIDAISMY7JYY5F7RTT6AO",
"ApproximateFirstReceiveTimestamp": "1611050827344"
},
"messageAttributes": {},
"md5OfBody": "8910bdaaf9a30a607f7891037d4af0b0",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:eu-west-1:231436140809:powertools265",
"awsRegion": "eu-west-1"
}
]
}
12 changes: 12 additions & 0 deletions tests/functional/parser/test_sns.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,15 @@ def handle_sns_no_envelope(event: MyAdvancedSnsBusiness, _: LambdaContext):
def test_handle_sns_trigger_event_no_envelope():
event_dict = load_event("snsEvent.json")
handle_sns_no_envelope(event_dict, LambdaContext())


@event_parser(model=MySnsBusiness, envelope=envelopes.SnsSqsEnvelope)
def handle_sns_sqs_json_body(event: List[MySnsBusiness], _: LambdaContext):
assert len(event) == 1
assert event[0].message == "hello world"
assert event[0].username == "lessa"


def test_handle_sns_sqs_trigger_event_json_body(): # noqa: F811
event_dict = load_event("snsSqsEvent.json")
handle_sns_sqs_json_body(event_dict, LambdaContext())

0 comments on commit b538772

Please sign in to comment.