diff --git a/packages/@aws-cdk/aws-lambda-event-sources/README.md b/packages/@aws-cdk/aws-lambda-event-sources/README.md index 515949dc8da3c..d2e13fcdb3184 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/README.md +++ b/packages/@aws-cdk/aws-lambda-event-sources/README.md @@ -9,6 +9,11 @@ --- +An event source mapping is an AWS Lambda resource that reads from an event source and invokes a Lambda function. +You can use event source mappings to process items from a stream or queue in services that don't invoke Lambda +functions directly. Lambda provides event source mappings for the following services. Read more about lambda +event sources [here](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html). + This module includes classes that allow using various AWS services as event sources for AWS Lambda via the high-level `lambda.addEventSource(source)` API. @@ -16,6 +21,21 @@ NOTE: In most cases, it is also possible to use the resource APIs to invoke an AWS Lambda function. This library provides a uniform API for all Lambda event sources regardless of the underlying mechanism they use. +The following code sets up a lambda function with an SQS queue event source - + +```ts +const fn = new lambda.Function(this, 'MyFunction', { /* ... */ }); + +const queue = new sqs.Queue(this, 'MyQueue'); +const eventSource = lambda.addEventSource(new SqsEventSource(queue); + +const eventSourceId = eventSource.eventSourceId; +``` + +The `eventSourceId` property contains the event source id. This will be a +[token](https://docs.aws.amazon.com/cdk/latest/guide/tokens.html) that will resolve to the final value at the time of +deployment. + ### SQS Amazon Simple Queue Service (Amazon SQS) allows you to build asynchronous @@ -116,7 +136,7 @@ To process events with a Lambda function, first create or update a DynamoDB tabl and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior: * __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). -* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source. +* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source. ```ts import dynamodb = require('@aws-cdk/aws-dynamodb'); @@ -145,8 +165,8 @@ first create or update an Amazon Kinesis stream and select custom values for the event source parameters. The following parameters will impact Amazon Kinesis's polling behavior: -* __batchSize__: Determines how many records are buffered before invoking your lambnda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). -* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source. +* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). +* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source. ```ts import lambda = require('@aws-cdk/aws-lambda'); diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts index 82de57a8c10d6..9e520a6edd5f7 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts @@ -9,6 +9,8 @@ export interface DynamoEventSourceProps extends StreamEventSourceProps { * Use an Amazon DynamoDB stream as an event source for AWS Lambda. */ export class DynamoEventSource extends StreamEventSource { + private _eventSourceMappingId?: string = undefined; + constructor(private readonly table: dynamodb.Table, props: DynamoEventSourceProps) { super(props); @@ -22,10 +24,21 @@ export class DynamoEventSource extends StreamEventSource { throw new Error(`DynamoDB Streams must be enabled on the table ${this.table.node.path}`); } - target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, + const eventSourceMapping = target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, this.enrichMappingOptions({eventSourceArn: this.table.tableStreamArn}) ); + this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId; this.table.grantStreamRead(target); } + + /** + * The identifier for this EventSourceMapping + */ + public get eventSourceMappingId(): string { + if (!this._eventSourceMappingId) { + throw new Error("DynamoEventSource is not yet bound to an event source mapping"); + } + return this._eventSourceMappingId; + } } diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts index 78351d428f9d0..f6b83f6be2638 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts @@ -9,6 +9,8 @@ export interface KinesisEventSourceProps extends StreamEventSourceProps { * Use an Amazon Kinesis stream as an event source for AWS Lambda. */ export class KinesisEventSource extends StreamEventSource { + private _eventSourceMappingId?: string = undefined; + constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) { super(props); @@ -18,10 +20,21 @@ export class KinesisEventSource extends StreamEventSource { } public bind(target: lambda.IFunction) { - target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`, + const eventSourceMapping = target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`, this.enrichMappingOptions({eventSourceArn: this.stream.streamArn}) ); + this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId; this.stream.grantRead(target); } + + /** + * The identifier for this EventSourceMapping + */ + public get eventSourceMappingId(): string { + if (!this._eventSourceMappingId) { + throw new Error("KinesisEventSource is not yet bound to an event source mapping"); + } + return this._eventSourceMappingId; + } } diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts index 515659dcecfc2..4b15e1b8e4b5d 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts @@ -18,6 +18,8 @@ export interface SqsEventSourceProps { * Use an Amazon SQS queue as an event source for AWS Lambda. */ export class SqsEventSource implements lambda.IEventSource { + private _eventSourceMappingId?: string = undefined; + constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) { if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) { throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`); @@ -25,11 +27,22 @@ export class SqsEventSource implements lambda.IEventSource { } public bind(target: lambda.IFunction) { - target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, { + const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, { batchSize: this.props.batchSize, eventSourceArn: this.queue.queueArn, }); + this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId; this.queue.grantConsumeMessages(target); } + + /** + * The identifier for this EventSourceMapping + */ + public get eventSourceMappingId(): string { + if (!this._eventSourceMappingId) { + throw new Error("SqsEventSource is not yet bound to an event source mapping"); + } + return this._eventSourceMappingId; + } } diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts index 68b12725af0e3..437cbd556ae64 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts @@ -231,4 +231,46 @@ export = { test.done(); }, + 'contains eventSourceMappingId after lambda binding'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + const eventSource = new sources.DynamoEventSource(table, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON + }); + + // WHEN + fn.addEventSource(eventSource); + + // THEN + test.ok(eventSource.eventSourceMappingId); + test.done(); + }, + + 'eventSourceMappingId throws error before binding to lambda'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + const eventSource = new sources.DynamoEventSource(table, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON + }); + + // WHEN/THEN + test.throws(() => eventSource.eventSourceMappingId, /DynamoEventSource is not yet bound to an event source mapping/); + test.done(); + }, + }; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts index 53bd8eb3b0c27..a1427af34b3da 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts @@ -153,4 +153,34 @@ export = { test.done(); }, + + 'contains eventSourceMappingId after lambda binding'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const stream = new kinesis.Stream(stack, 'S'); + const eventSource = new sources.KinesisEventSource(stream, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON + }); + + // WHEN + fn.addEventSource(eventSource); + + // THEN + test.ok(eventSource.eventSourceMappingId); + test.done(); + }, + + 'eventSourceMappingId throws error before binding to lambda'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const stream = new kinesis.Stream(stack, 'S'); + const eventSource = new sources.KinesisEventSource(stream, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON + }); + + // WHEN/THEN + test.throws(() => eventSource.eventSourceMappingId, /KinesisEventSource is not yet bound to an event source mapping/); + test.done(); + }, }; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts index 90587f38ebdf2..3c596c1f8845d 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts @@ -112,4 +112,30 @@ export = { test.done(); }, + + 'contains eventSourceMappingId after lambda binding'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q'); + const eventSource = new sources.SqsEventSource(q); + + // WHEN + fn.addEventSource(eventSource); + + // THEN + test.ok(eventSource.eventSourceMappingId); + test.done(); + }, + + 'eventSourceMappingId throws error before binding to lambda'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const q = new sqs.Queue(stack, 'Q'); + const eventSource = new sources.SqsEventSource(q); + + // WHEN/THEN + test.throws(() => eventSource.eventSourceMappingId, /SqsEventSource is not yet bound to an event source mapping/); + test.done(); + }, }; diff --git a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts index a30a277af3f31..d4f7f3f4c9012 100644 --- a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts +++ b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts @@ -67,6 +67,12 @@ export interface EventSourceMappingProps extends EventSourceMappingOptions { * modify the Lambda's execution role so it can consume messages from the queue. */ export class EventSourceMapping extends cdk.Resource { + /** + * The identifier for this EventSourceMapping + * @attribute + */ + public readonly eventSourceMappingId: string; + constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) { super(scope, id); @@ -74,7 +80,7 @@ export class EventSourceMapping extends cdk.Resource { throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`); } - new CfnEventSourceMapping(this, 'Resource', { + const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', { batchSize: props.batchSize, enabled: props.enabled, eventSourceArn: props.eventSourceArn, @@ -82,6 +88,7 @@ export class EventSourceMapping extends cdk.Resource { startingPosition: props.startingPosition, maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(), }); + this.eventSourceMappingId = cfnEventSourceMapping.ref; } }