Skip to content

Commit

Permalink
fix: Extract batch docs examples
Browse files Browse the repository at this point in the history
Changes:
- Extract examples for batch
- Fix python syntax
- Fix yaml synxtax

Related to
- aws-powertools#1064
  • Loading branch information
michaelbrewer committed Apr 12, 2022
1 parent b577366 commit 9787fa9
Show file tree
Hide file tree
Showing 31 changed files with 933 additions and 872 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ changelog:

mypy:
poetry run mypy --pretty aws_lambda_powertools

format-examples:
poetry run isort docs/examples
poetry run black docs/examples/*/*/*.py

lint-examples:
poetry run python3 -m py_compile docs/examples/*/*/*.py
cfn-lint docs/examples/*/*/*.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()


@tracer.capture_method(capture_response=False)
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
62 changes: 62 additions & 0 deletions docs/examples/utilities/batch/custom_batch_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
from random import randint

import boto3

from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor

table_name = os.getenv("TABLE_NAME", "table_not_found")


class MyPartialProcessor(BasePartialProcessor):
"""
Process a record and stores successful results at a Amazon DynamoDB Table
Parameters
----------
table_name: str
DynamoDB table name to write results to
"""

def __init__(self, table_name: str):
self.table_name = table_name

super().__init__()

def _prepare(self):
# It's called once, *before* processing
# Creates table resource and clean previous results
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
self.success_messages.clear()

def _clean(self):
# It's called once, *after* closing processing all records (closing the context manager)
# Here we're sending, at once, all successful messages to a ddb table
with self.ddb_table.batch_writer() as batch:
for result in self.success_messages:
batch.put_item(Item=result)

def _process_record(self, record):
# It handles how your record is processed
# Here we're keeping the status of each run
# where self.handler is the record_handler function passed as an argument
try:
result = self.handler(record) # record_handler passed to decorator/context manager
return self.success_handler(record, result)
except Exception as exc:
return self.failure_handler(record, exc)

def success_handler(self, record, result):
entry = ("success", result, record)
message = {"age": result}
self.success_messages.append(message)
return entry


def record_handler(record):
return randint(0, 100)


@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
def lambda_handler(event, context):
return {"statusCode": 200}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import boto3

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

session = boto3.session.Session()


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


def lambda_handler(event, context):
records = event["Records"]

processor = PartialSQSProcessor(boto3_session=session)

with processor(records, record_handler):
result = processor.process()

return result
17 changes: 17 additions & 0 deletions docs/examples/utilities/batch/custom_boto3_session_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import boto3

from aws_lambda_powertools.utilities.batch import sqs_batch_processor

session = boto3.session.Session()


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


@sqs_batch_processor(record_handler=record_handler, boto3_session=session)
def lambda_handler(event, context):
return {"statusCode": 200}
23 changes: 23 additions & 0 deletions docs/examples/utilities/batch/custom_config_context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from botocore.config import Config

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

config = Config(region_name="us-east-1")


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


def lambda_handler(event, context):
records = event["Records"]

processor = PartialSQSProcessor(config=config)

with processor(records, record_handler):
result = processor.process()

return result
17 changes: 17 additions & 0 deletions docs/examples/utilities/batch/custom_config_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from botocore.config import Config

from aws_lambda_powertools.utilities.batch import sqs_batch_processor

config = Config(region_name="us-east-1")


def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value


@sqs_batch_processor(record_handler=record_handler, config=config)
def lambda_handler(event, context):
return {"statusCode": 200}
30 changes: 30 additions & 0 deletions docs/examples/utilities/batch/dynamodb_streams_context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
logger.info(record.dynamodb.new_image)
payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value)
# alternatively:
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
batch = event["Records"]
with processor(records=batch, handler=record_handler):
processed_messages = processor.process() # kick off processing, return list[tuple]

return processor.response()
27 changes: 27 additions & 0 deletions docs/examples/utilities/batch/dynamodb_streams_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: DynamoDBRecord):
logger.info(record.dynamodb.new_image)
payload: dict = json.loads(record.dynamodb.new_image.get("Message").get_value)
# alternatively:
# changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image
# payload = change.get("Message").raw_event -> {"S": "<payload>"}
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
from typing import Dict, Literal, Optional

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.parser import BaseModel, validator
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel
from aws_lambda_powertools.utilities.typing import LambdaContext


class Order(BaseModel):
item: dict


class OrderDynamoDB(BaseModel):
Message: Order

# auto transform json string
# so Pydantic can auto-initialize nested Order model
@validator("Message", pre=True)
def transform_message_to_dict(cls, value: Dict[Literal["S"], str]):
return json.loads(value["S"])


class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):
NewImage: Optional[OrderDynamoDB]
OldImage: Optional[OrderDynamoDB]


class OrderDynamoDBRecord(DynamoDBStreamRecordModel):
dynamodb: OrderDynamoDBChangeRecord


processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: OrderDynamoDBRecord):
return record.dynamodb.NewImage.Message.item


@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
66 changes: 66 additions & 0 deletions docs/examples/utilities/batch/dynamodb_streams_template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: partial batch response sample

Globals:
Function:
Timeout: 5
MemorySize: 256
Runtime: python3.9
Tracing: Active
Environment:
Variables:
LOG_LEVEL: INFO
POWERTOOLS_SERVICE_NAME: hello

Resources:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
CodeUri: hello_world
Policies:
# Lambda Destinations require additional permissions
# to send failure records from Kinesis/DynamoDB
- Version: "2012-10-17"
Statement:
Effect: "Allow"
Action:
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
- sqs:SendMessage
Resource: !GetAtt SampleDLQ.Arn
Events:
DynamoDBStream:
Type: DynamoDB
Properties:
Stream: !GetAtt SampleTable.StreamArn
StartingPosition: LATEST
MaximumRetryAttempts: 2
DestinationConfig:
OnFailure:
Destination: !GetAtt SampleDLQ.Arn
FunctionResponseTypes:
- ReportBatchItemFailures

SampleDLQ:
Type: AWS::SQS::Queue

SampleTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
SSESpecification:
SSEEnabled: yes
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
logger.info(record.kinesis.data_as_text)
payload: dict = record.kinesis.data_as_json()
...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
batch = event["Records"]
with processor(records=batch, handler=record_handler):
processed_messages = processor.process() # kick off processing, return list[tuple]

return processor.response()
Loading

0 comments on commit 9787fa9

Please sign in to comment.