Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): failure handling for stream event sources #5929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
e5ab49e
feat(lambda-event-sources): add failure handling parameters to lambda…
xerofun Jan 22, 2020
26dcde6
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Jan 23, 2020
87b3477
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Jan 23, 2020
5627c7c
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Jan 24, 2020
c271545
Standardize default documentation on maximumRetryAttempts
xerofun Jan 27, 2020
ebf5404
Refactor maximumRecordAgeInSeconds to be a duration
xerofun Jan 27, 2020
3690df1
Refactor away from using lambda.generated for destination config
xerofun Jan 28, 2020
4697831
Remove unnecessary linter exceptions
xerofun Jan 28, 2020
d8bdf54
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Jan 28, 2020
95eb54b
Simplified comments, variable names, and conditionals
xerofun Jan 30, 2020
5d387c3
Merge branch 'xerofun/lambda-event-source-mapping-parameters-5236' of…
xerofun Jan 30, 2020
7662d58
Update README with new parameters
xerofun Jan 30, 2020
dbafdcd
Update packages/@aws-cdk/aws-lambda-event-sources/README.md
xerofun Feb 5, 2020
9a51afd
Update packages/@aws-cdk/aws-lambda-event-sources/README.md
xerofun Feb 5, 2020
daff204
Update packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
xerofun Feb 5, 2020
e13f488
Update packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
xerofun Feb 5, 2020
a4fccec
Update packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
xerofun Feb 5, 2020
8223345
Update README with links to lambda event links
xerofun Feb 5, 2020
3cae74c
Use better variable names
xerofun Feb 5, 2020
61918bb
Add integration test and refactor away from IDestination
xerofun Feb 13, 2020
a68f2d5
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Feb 13, 2020
f3b3e22
Pull up DLQ libraries to remedy circular dependency
xerofun Feb 17, 2020
0f994f8
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Feb 17, 2020
2c50c32
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
xerofun Feb 17, 2020
8d7e516
Clarify documentation on dead letter queue objects
xerofun Feb 17, 2020
7e34a57
Remove package-lock.json
xerofun Mar 2, 2020
7263cc5
Standardize case on new classes
xerofun Mar 2, 2020
17eee38
Clarify documentation
xerofun Mar 2, 2020
22b05c8
Merge pull request #1 from aws/master
xerofun Mar 4, 2020
3a9a9dc
Merge remote-tracking branch 'origin/master' into xerofun/lambda-even…
xerofun Mar 4, 2020
c44be13
Move DLQ implementations to event sources package
xerofun Mar 5, 2020
a4f05fe
Extract test lambda from integration test into external file
xerofun Mar 6, 2020
a8b2fed
fixup readme
Mar 9, 2020
a4e59b2
2 space indent
Mar 9, 2020
01f5ec3
Add unit test for event source mapping validations
xerofun Mar 9, 2020
faca307
Add permissions to send messages to DLQs to lambda
xerofun Mar 9, 2020
068be7a
fix integ test and stack verification steps
Mar 9, 2020
b4f51a1
Update packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
xerofun Mar 9, 2020
fb6957b
Clean up tests
xerofun Mar 9, 2020
43cd63b
Add event source mapping to DLQ bind signature
xerofun Mar 9, 2020
303aabb
Fix integration test expected
xerofun Mar 9, 2020
c3f4dfe
added IEventSourceMapping
Mar 10, 2020
2d2e1d7
Refactor to IEventSourceMapping
xerofun Mar 10, 2020
9d0d2f0
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
mergify[bot] Mar 11, 2020
e2dc206
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
Mar 13, 2020
b6e842f
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
Mar 16, 2020
f149ceb
Merge branch 'master' into xerofun/lambda-event-source-mapping-parame…
Mar 16, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,43 +130,62 @@ CloudWatch.
### DynamoDB Streams

You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update)
operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information.
operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information about configuring Lambda function event sources with DynamoDB.

To process events with a Lambda function, first create or update a DynamoDB table and enable a `stream` specification. Then, create a `DynamoEventSource`
and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior:

* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __bisectBatchOnError__: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
* __maxRecordAge__: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
* __onFailure__: In the event a record fails after all retries or if the record age has exceeded the configured value, the record will be sent to SQS queue or SNS topic that is specified here
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.

xerofun marked this conversation as resolved.
Show resolved Hide resolved
```ts
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');
import { DynamoEventSource } from '@aws-cdk/aws-lambda-event-sources';
import sqs = require('@aws-cdk/aws-sqs');
import { DynamoEventSource, SqsDlq } from '@aws-cdk/aws-lambda-event-sources';

const table = new dynamodb.Table(..., {
partitionKey: ...,
stream: dynamodb.StreamViewType.NEW_IMAGE // make sure stream is configured
});

const deadLetterQueue = new sqs.Queue(this, 'deadLetterQueue');

const function = new lambda.Function(...);
function.addEventSource(new DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
batchSize: 5,
bisectBatchOnError: true,
onFailure: new SqsDlq(deadLetterQueue),
retryAttempts: 10
}));
```

### Kinesis

You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon SQS, see [Amazon Kinesis
Service](https://aws.amazon.com/kinesis/data-streams/). To view a sample event,
see [Amazon SQS Event](https://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-kinesis-streams).
You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon Kinesis, see [Amazon Kinesis
Service](https://aws.amazon.com/kinesis/data-streams/). To learn more about configuring Lambda function event sources with kinesis and view a sample event,
see [Amazon Kinesis Event](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).

To set up Amazon Kinesis as an event source for AWS Lambda, you
first create or update an Amazon Kinesis stream and select custom values for the
event source parameters. The following parameters will impact Amazon Kinesis's polling
behavior:

* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __bisectBatchOnError__: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing.
* __maxRecordAge__: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.

xerofun marked this conversation as resolved.
Show resolved Hide resolved
```ts
import lambda = require('@aws-cdk/aws-lambda');
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ export * from './dynamodb';
export * from './kinesis';
export * from './s3';
export * from './sns';
export * from './sns-dlq';
export * from './stream';
export * from './sqs';
export * from './sqs-dlq';
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sns-dlq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { DlqDestinationConfig, IEventSourceDlq, IEventSourceMapping, IFunction } from "@aws-cdk/aws-lambda";
import * as sns from '@aws-cdk/aws-sns';

/**
* An SNS dead letter queue destination configuration for a Lambda event source
*/
export class SnsDlq implements IEventSourceDlq {
constructor(private readonly topic: sns.ITopic) {
}

/**
* Returns a destination configuration for the DLQ
*/
public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig {
this.topic.grantPublish(targetHandler);

return {
destination: this.topic.topicArn
};
}
}
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs-dlq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { DlqDestinationConfig, IEventSourceDlq, IEventSourceMapping, IFunction } from "@aws-cdk/aws-lambda";
import * as sqs from '@aws-cdk/aws-sqs';

/**
* An SQS dead letter queue destination configuration for a Lambda event source
*/
export class SqsDlq implements IEventSourceDlq {
constructor(private readonly queue: sqs.IQueue) {
}

/**
* Returns a destination configuration for the DLQ
*/
public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig {
this.queue.grantSendMessages(targetHandler);

return {
destination: this.queue.queueArn
};
}
}
51 changes: 50 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as lambda from '@aws-cdk/aws-lambda';
import {Duration} from '@aws-cdk/core';
import { Duration } from '@aws-cdk/core';

/**
* The set of properties for event sources that follow the streaming model,
Expand All @@ -21,6 +21,50 @@ export interface StreamEventSourceProps {
*/
readonly batchSize?: number;

/**
* If the function returns an error, split the batch in two and retry.
*
* @default false
*/
readonly bisectBatchOnError?: boolean;

/**
* An Amazon SQS queue or Amazon SNS topic destination for discarded records.
*
* @default discarded records are ignored
*/
readonly onFailure?: lambda.IEventSourceDlq;

/**
* The maximum age of a record that Lambda sends to a function for processing.
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* @default Duration.days(7)
*/
readonly maxRecordAge?: Duration;

/**
* Maximum number of retry attempts
xerofun marked this conversation as resolved.
Show resolved Hide resolved
* Valid Range:
* * Minimum value of 0
* * Maximum value of 10000
*
* @default 10000
*/
readonly retryAttempts?: number;

/**
* The number of batches to process from each shard concurrently.
* Valid Range:
* * Minimum value of 1
* * Maximum value of 10
*
* @default 1
xerofun marked this conversation as resolved.
Show resolved Hide resolved
*/
readonly parallelizationFactor?: number;

/**
* Where to begin consuming the stream.
*/
Expand Down Expand Up @@ -48,8 +92,13 @@ export abstract class StreamEventSource implements lambda.IEventSource {
return {
...options,
batchSize: this.props.batchSize || 100,
bisectBatchOnError: this.props.bisectBatchOnError,
startingPosition: this.props.startingPosition,
maxBatchingWindow: this.props.maxBatchingWindow,
maxRecordAge: this.props.maxRecordAge,
retryAttempts: this.props.retryAttempts,
parallelizationFactor: this.props.parallelizationFactor,
onFailure: this.props.onFailure
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
{
"Resources": {
"FServiceRole3AC82EE1": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"FServiceRoleDefaultPolicy17A19BFA": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"Q63C6E3AB",
"Arn"
]
}
},
{
"Action": [
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
"Roles": [
{
"Ref": "FServiceRole3AC82EE1"
}
]
}
},
"FC4345940": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function handler(event) {\n // tslint:disable-next-line:no-console\n console.log('event:', JSON.stringify(event, undefined, 2));\n throw new Error();\n}"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"FServiceRole3AC82EE1",
"Arn"
]
},
"Runtime": "nodejs10.x"
},
"DependsOn": [
"FServiceRoleDefaultPolicy17A19BFA",
"FServiceRole3AC82EE1"
]
},
"FKinesisEventSourcelambdaeventsourcekinesiswithdlqSD357FCB87EEA8CB4": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 100,
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Fn::GetAtt": [
"Q63C6E3AB",
"Arn"
]
}
}
},
"MaximumRetryAttempts": 0,
"StartingPosition": "TRIM_HORIZON"
}
},
"S509448A1": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"RetentionPeriodHours": 24
}
},
"Q63C6E3AB": {
"Type": "AWS::SQS::Queue"
}
},
"Outputs": {
"InputKinesisStreamName": {
"Value": {
"Ref": "S509448A1"
}
},
"DlqSqsQueueUrl": {
"Value": {
"Ref": "Q63C6E3AB"
}
},
"FunctionArn": {
"Value":{
"Fn::GetAtt":["FC4345940","Arn"]
}
}
},
"Parameters": {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { App, CfnOutput, Stack } from "@aws-cdk/core";
import { KinesisEventSource, SqsDlq } from '../lib';

/*
* Stack verification steps:
* * aws kinesis put-record --stream-name <value of stack output: InputKinesisStreamName> --partition-key 123 --data testdata
* * aws sqs receive-message --queue-url <value of stack output: DlqSqsQueueUrl> --max-number-of-messages 1 --query 'Messages[0].Body'
* The last command should return a string that contains the Lambda function ARN in it.
*/

async function handler(event: any) {
// tslint:disable-next-line:no-console
console.log('event:', JSON.stringify(event, undefined, 2));
throw new Error();
}

class KinesisWithDLQTest extends Stack {
constructor(scope: App, id: string) {
super(scope, id);

const fn = new lambda.Function(this, 'F', {
runtime: lambda.Runtime.NODEJS_10_X,
handler: 'index.handler',
code: lambda.Code.fromInline(`exports.handler = ${handler.toString()}`)
});
new CfnOutput(this, 'FunctionArn', { value: fn.functionArn });

const stream = new kinesis.Stream(this, 'S');
new CfnOutput(this, 'InputKinesisStreamName', { value: stream.streamName });

const dlq = new sqs.Queue(this, 'Q');
new CfnOutput(this, 'DlqSqsQueueUrl', { value: dlq.queueUrl });

fn.addEventSource(new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
onFailure: new SqsDlq(dlq),
retryAttempts: 0,
}));
}
}

const app = new App();
new KinesisWithDLQTest(app, 'lambda-event-source-kinesis-with-dlq');
app.synth();
Loading