Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): expose eventSourceMappingId #5689

Merged
26 changes: 23 additions & 3 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,33 @@
---
<!--END STABILITY BANNER-->

An event source mapping is an AWS Lambda resource that reads from an event source and invokes a Lambda function.
You can use event source mappings to process items from a stream or queue in services that don't invoke Lambda
functions directly. Lambda provides event source mappings for the following services. Read more about lambda
event sources [here](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html).

This module includes classes that allow using various AWS services as event
sources for AWS Lambda via the high-level `lambda.addEventSource(source)` API.

NOTE: In most cases, it is also possible to use the resource APIs to invoke an
AWS Lambda function. This library provides a uniform API for all Lambda event
sources regardless of the underlying mechanism they use.

The following code sets up a lambda function with an SQS queue event source -

```ts
const fn = new lambda.Function(this, 'MyFunction', { /* ... */ });

const queue = new sqs.Queue(this, 'MyQueue');
const eventSource = lambda.addEventSource(new SqsEventSource(queue);

const eventSourceId = eventSource.eventSourceId;
```

The `eventSourceId` property contains the event source id. This will be a
[token](https://docs.aws.amazon.com/cdk/latest/guide/tokens.html) that will resolve to the final value at the time of
deployment.

### SQS

Amazon Simple Queue Service (Amazon SQS) allows you to build asynchronous
Expand Down Expand Up @@ -116,7 +136,7 @@ To process events with a Lambda function, first create or update a DynamoDB tabl
and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior:

* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.

```ts
import dynamodb = require('@aws-cdk/aws-dynamodb');
Expand Down Expand Up @@ -145,8 +165,8 @@ first create or update an Amazon Kinesis stream and select custom values for the
event source parameters. The following parameters will impact Amazon Kinesis's polling
behavior:

* __batchSize__: Determines how many records are buffered before invoking your lambnda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.
* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.

```ts
import lambda = require('@aws-cdk/aws-lambda');
Expand Down
15 changes: 14 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export interface DynamoEventSourceProps extends StreamEventSourceProps {
* Use an Amazon DynamoDB stream as an event source for AWS Lambda.
*/
export class DynamoEventSource extends StreamEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(private readonly table: dynamodb.Table, props: DynamoEventSourceProps) {
super(props);

Expand All @@ -22,10 +24,21 @@ export class DynamoEventSource extends StreamEventSource {
throw new Error(`DynamoDB Streams must be enabled on the table ${this.table.node.path}`);
}

target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`,
const eventSourceMapping = target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.table.tableStreamArn})
);
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

this.table.grantStreamRead(target);
}

/**
* The identifier for this EventSourceMapping
*/
public get eventSourceMappingId(): string {
if (!this._eventSourceMappingId) {
throw new Error("DynamoEventSource is not yet bound to an event source mapping");
}
return this._eventSourceMappingId;
}
}
15 changes: 14 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export interface KinesisEventSourceProps extends StreamEventSourceProps {
* Use an Amazon Kinesis stream as an event source for AWS Lambda.
*/
export class KinesisEventSource extends StreamEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) {
super(props);

Expand All @@ -18,10 +20,21 @@ export class KinesisEventSource extends StreamEventSource {
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`,
const eventSourceMapping = target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.stream.streamArn})
);
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

this.stream.grantRead(target);
}

/**
* The identifier for this EventSourceMapping
*/
public get eventSourceMappingId(): string {
if (!this._eventSourceMappingId) {
throw new Error("KinesisEventSource is not yet bound to an event source mapping");
}
return this._eventSourceMappingId;
}
}
15 changes: 14 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,31 @@ export interface SqsEventSourceProps {
* Use an Amazon SQS queue as an event source for AWS Lambda.
*/
export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
}
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
batchSize: this.props.batchSize,
eventSourceArn: this.queue.queueArn,
});
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;

this.queue.grantConsumeMessages(target);
}

/**
* The identifier for this EventSourceMapping
*/
public get eventSourceMappingId(): string {
if (!this._eventSourceMappingId) {
throw new Error("SqsEventSource is not yet bound to an event source mapping");
}
return this._eventSourceMappingId;
}
}
42 changes: 42 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,46 @@ export = {
test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});
const eventSource = new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN
fn.addEventSource(eventSource);

// THEN
test.ok(eventSource.eventSourceMappingId);
test.done();
},

'eventSourceMappingId throws error before binding to lambda'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});
const eventSource = new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN/THEN
test.throws(() => eventSource.eventSourceMappingId, /DynamoEventSource is not yet bound to an event source mapping/);
test.done();
},

};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,34 @@ export = {

test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');
const eventSource = new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN
fn.addEventSource(eventSource);

// THEN
test.ok(eventSource.eventSourceMappingId);
test.done();
},

'eventSourceMappingId throws error before binding to lambda'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const stream = new kinesis.Stream(stack, 'S');
const eventSource = new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
});

// WHEN/THEN
test.throws(() => eventSource.eventSourceMappingId, /KinesisEventSource is not yet bound to an event source mapping/);
test.done();
},
};
26 changes: 26 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,30 @@ export = {

test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
const eventSource = new sources.SqsEventSource(q);

// WHEN
fn.addEventSource(eventSource);

// THEN
test.ok(eventSource.eventSourceMappingId);
test.done();
},

'eventSourceMappingId throws error before binding to lambda'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const q = new sqs.Queue(stack, 'Q');
const eventSource = new sources.SqsEventSource(q);

// WHEN/THEN
test.throws(() => eventSource.eventSourceMappingId, /SqsEventSource is not yet bound to an event source mapping/);
test.done();
},
};
9 changes: 8 additions & 1 deletion packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,28 @@ export interface EventSourceMappingProps extends EventSourceMappingOptions {
* modify the Lambda's execution role so it can consume messages from the queue.
*/
export class EventSourceMapping extends cdk.Resource {
/**
* The identifier for this EventSourceMapping
* @attribute
*/
public readonly eventSourceMappingId: string;

constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);

if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}

new CfnEventSourceMapping(this, 'Resource', {
const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(),
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}
}

Expand Down