Skip to content

Commit

Permalink
feat(lambda): kafka topic as an event source (#10445)
Browse files Browse the repository at this point in the history
Lambda recently added support for MSK as an event source (https://aws.amazon.com/about-aws/whats-new/2020/08/aws-lambda-now-supports-amazon-managed-streaming-for-apache-kafka-as-an-event-source/), and there's now a "Topics" property on the CloudFormation resource definition (https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics).

Closes #10138


----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
dscpinheiro authored Sep 22, 2020
1 parent 272363a commit dac1e12
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
16 changes: 12 additions & 4 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export interface EventSourceMappingOptions {
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default - Amazon Kinesis and Amazon DynamoDB is 100 records.
* @default - Amazon Kinesis, Amazon DynamoDB, and Amazon MSK is 100 records.
* Both the default and maximum for Amazon SQS are 10 messages.
*/
readonly batchSize?: number;
Expand All @@ -44,12 +44,12 @@ export interface EventSourceMappingOptions {
readonly enabled?: boolean;

/**
* The position in the DynamoDB or Kinesis stream where AWS Lambda should
* The position in the DynamoDB, Kinesis or MSK stream where AWS Lambda should
* start reading.
*
* @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType
*
* @default - Required for Amazon Kinesis and Amazon DynamoDB Streams sources.
* @default - Required for Amazon Kinesis, Amazon DynamoDB, and Amazon MSK Streams sources.
*/
readonly startingPosition?: StartingPosition;

Expand Down Expand Up @@ -91,6 +91,13 @@ export interface EventSourceMappingOptions {
* @default 1
*/
readonly parallelizationFactor?: number;

/**
* The name of the Kafka topic.
*
* @default - no topic
*/
readonly kafkaTopic?: string;
}

/**
Expand Down Expand Up @@ -185,13 +192,14 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(),
maximumRetryAttempts: props.retryAttempts,
parallelizationFactor: props.parallelizationFactor,
topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}
}

/**
* The position in the DynamoDB or Kinesis stream where AWS Lambda should start
* The position in the DynamoDB, Kinesis or MSK stream where AWS Lambda should start
* reading.
*/
export enum StartingPosition {
Expand Down
28 changes: 28 additions & 0 deletions packages/@aws-cdk/aws-lambda/test/test.event-source-mapping.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { expect, haveResourceLike } from '@aws-cdk/assert';
import * as cdk from '@aws-cdk/core';
import { Test } from 'nodeunit';
import { Code, EventSourceMapping, Function, Runtime } from '../lib';
Expand Down Expand Up @@ -185,4 +186,31 @@ export = {
test.equals(imported.stack.stackName, 'test-stack');
test.done();
},

'accepts if kafkaTopic is a parameter'(test: Test) {
const stack = new cdk.Stack();
const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', {
type: 'String',
});

const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});

new EventSourceMapping(stack, 'test', {
target: fn,
eventSourceArn: '',
kafkaTopic: topicNameParam.valueAsString,
});

expect(stack).to(haveResourceLike('AWS::Lambda::EventSourceMapping', {
Topics: [{
Ref: 'TopicNameParam',
}],
}));

test.done();
},
};

0 comments on commit dac1e12

Please sign in to comment.