diff --git a/packages/@aws-cdk/aws-ecs-patterns/test/ec2/integ.scheduled-ecs-task.lit.expected.json b/packages/@aws-cdk/aws-ecs-patterns/test/ec2/integ.scheduled-ecs-task.lit.expected.json index 5d7c2f56c5e2c..61916c2affd30 100644 --- a/packages/@aws-cdk/aws-ecs-patterns/test/ec2/integ.scheduled-ecs-task.lit.expected.json +++ b/packages/@aws-cdk/aws-ecs-patterns/test/ec2/integ.scheduled-ecs-task.lit.expected.json @@ -506,7 +506,7 @@ "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA" ] }, - "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": { + "EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", diff --git a/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-awsvpc-nw.expected.json b/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-awsvpc-nw.expected.json index e3b29d4b8aae2..8a82a11e89bc6 100644 --- a/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-awsvpc-nw.expected.json +++ b/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-awsvpc-nw.expected.json @@ -671,7 +671,7 @@ "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA" ] }, - "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": { + "EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", diff --git a/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-bridge-nw.expected.json b/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-bridge-nw.expected.json index 64cd3a75c59a0..d832956c2e8e7 100644 --- a/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-bridge-nw.expected.json +++ b/packages/@aws-cdk/aws-ecs/test/ec2/integ.lb-bridge-nw.expected.json @@ -692,7 +692,7 @@ "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA" ] }, - "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": { + "EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", diff --git a/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-awsvpc-nw.expected.json b/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-awsvpc-nw.expected.json index 0793f9813976a..60393cefcb965 100644 --- a/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-awsvpc-nw.expected.json +++ b/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-awsvpc-nw.expected.json @@ -671,7 +671,7 @@ "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA" ] }, - "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": { + "EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", @@ -946,4 +946,4 @@ } } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-bridge-nw.expected.json b/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-bridge-nw.expected.json index efd377829f8ce..e9a6fd0d4d524 100644 --- a/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-bridge-nw.expected.json +++ b/packages/@aws-cdk/aws-ecs/test/ec2/integ.sd-bridge-nw.expected.json @@ -671,7 +671,7 @@ "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA" ] }, - "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": { + "EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", @@ -910,4 +910,4 @@ } } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-events-targets/test/codebuild/integ.project-events.expected.json b/packages/@aws-cdk/aws-events-targets/test/codebuild/integ.project-events.expected.json index 924e12491e703..e0f79b8d845ce 100644 --- a/packages/@aws-cdk/aws-events-targets/test/codebuild/integ.project-events.expected.json +++ b/packages/@aws-cdk/aws-events-targets/test/codebuild/integ.project-events.expected.json @@ -327,7 +327,7 @@ "MyQueueE6CA6235": { "Type": "AWS::SQS::Queue" }, - "MyQueueMyTopicSubscriptionEB66AD1B": { + "MyTopicMyQueueFA241964": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "sqs", diff --git a/packages/@aws-cdk/aws-events-targets/test/ecs/integ.event-ec2-task.lit.expected.json b/packages/@aws-cdk/aws-events-targets/test/ecs/integ.event-ec2-task.lit.expected.json index 82973ff075d5f..c1643598f14ae 100644 --- a/packages/@aws-cdk/aws-events-targets/test/ecs/integ.event-ec2-task.lit.expected.json +++ b/packages/@aws-cdk/aws-events-targets/test/ecs/integ.event-ec2-task.lit.expected.json @@ -506,7 +506,7 @@ "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole94543EDA" ] }, - "EcsClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionDA5F8A10": { + "EcsClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionA8966A35": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", @@ -1197,4 +1197,4 @@ "Description": "Artifact hash for asset \"aws-ecs-integ-ecs/AdoptEcrRepositorydbc60defc59544bcaa5c28c95d68f62c/Code\"" } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-events-targets/test/sns/integ.sns-event-rule-target.expected.json b/packages/@aws-cdk/aws-events-targets/test/sns/integ.sns-event-rule-target.expected.json index 2c8279ffca762..a6236922383a9 100644 --- a/packages/@aws-cdk/aws-events-targets/test/sns/integ.sns-event-rule-target.expected.json +++ b/packages/@aws-cdk/aws-events-targets/test/sns/integ.sns-event-rule-target.expected.json @@ -57,7 +57,7 @@ "MyQueueE6CA6235": { "Type": "AWS::SQS::Queue" }, - "MyQueueMyTopicSubscriptionEB66AD1B": { + "MyTopicMyQueueFA241964": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "sqs", @@ -108,4 +108,4 @@ } } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.sns.expected.json b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.sns.expected.json index 6a50e98575458..1134471c0d6b7 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.sns.expected.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.sns.expected.json @@ -60,7 +60,7 @@ "FServiceRole3AC82EE1" ] }, - "FTSubscription775EAF05": { + "TF2453034D": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", diff --git a/packages/@aws-cdk/aws-sns-subscriptions/lib/email.ts b/packages/@aws-cdk/aws-sns-subscriptions/lib/email.ts index 2691ae6fa41e9..97a4c9846774b 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/lib/email.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/lib/email.ts @@ -1,10 +1,10 @@ import sns = require('@aws-cdk/aws-sns'); -import { Construct } from '@aws-cdk/cdk'; +import { SubscriptionProps } from './subscription'; /** * Options for email subscriptions. */ -export interface EmailSubscriptionProps { +export interface EmailSubscriptionProps extends SubscriptionProps { /** * Indicates if the full notification JSON should be sent to the email * address or just the message text. @@ -23,11 +23,12 @@ export class EmailSubscription implements sns.ITopicSubscription { constructor(private readonly emailAddress: string, private readonly props: EmailSubscriptionProps = {}) { } - public bind(scope: Construct, topic: sns.ITopic): void { - new sns.Subscription(scope, this.emailAddress, { - topic, + public bind(_topic: sns.ITopic): sns.TopicSubscriptionConfig { + return { + subscriberId: this.emailAddress, endpoint: this.emailAddress, - protocol: this.props.json ? sns.SubscriptionProtocol.EMAIL_JSON : sns.SubscriptionProtocol.EMAIL - }); + protocol: this.props.json ? sns.SubscriptionProtocol.EMAIL_JSON : sns.SubscriptionProtocol.EMAIL, + filterPolicy: this.props.filterPolicy, + }; } } diff --git a/packages/@aws-cdk/aws-sns-subscriptions/lib/index.ts b/packages/@aws-cdk/aws-sns-subscriptions/lib/index.ts index c63c355ff3aa1..a6646dfa2e0af 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/lib/index.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/lib/index.ts @@ -1,3 +1,4 @@ +export * from './subscription'; export * from './email'; export * from './lambda'; export * from './sqs'; diff --git a/packages/@aws-cdk/aws-sns-subscriptions/lib/lambda.ts b/packages/@aws-cdk/aws-sns-subscriptions/lib/lambda.ts index 55c78bfa6aa2e..d5e2c34a52770 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/lib/lambda.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/lib/lambda.ts @@ -2,37 +2,38 @@ import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); import sns = require('@aws-cdk/aws-sns'); import { Construct } from '@aws-cdk/cdk'; +import { SubscriptionProps } from './subscription'; +/** + * Properties for a Lambda subscription + */ +export interface LambdaSubscriptionProps extends SubscriptionProps { + +} /** * Use a Lambda function as a subscription target */ export class LambdaSubscription implements sns.ITopicSubscription { - constructor(private readonly fn: lambda.IFunction) { + constructor(private readonly fn: lambda.IFunction, private readonly props: LambdaSubscriptionProps = {}) { } - public bind(_scope: Construct, topic: sns.ITopic): void { + public bind(topic: sns.ITopic): sns.TopicSubscriptionConfig { // Create subscription under *consuming* construct to make sure it ends up // in the correct stack in cases of cross-stack subscriptions. if (!Construct.isConstruct(this.fn)) { throw new Error(`The supplied lambda Function object must be an instance of Construct`); } - // we use the target name as the subscription's. there's no meaning to - // subscribing the same queue twice on the same topic. - const subscriptionName = topic.node.id + 'Subscription'; - if (this.fn.node.tryFindChild(subscriptionName)) { - throw new Error(`A subscription between the topic ${topic.node.id} and the lambda ${this.fn.node.id} already exists`); - } - - new sns.Subscription(this.fn, subscriptionName, { - topic, - endpoint: this.fn.functionArn, - protocol: sns.SubscriptionProtocol.LAMBDA, - }); - this.fn.addPermission(topic.node.id, { sourceArn: topic.topicArn, principal: new iam.ServicePrincipal('sns.amazonaws.com'), }); + + return { + subscriberId: this.fn.node.id, + endpoint: this.fn.functionArn, + protocol: sns.SubscriptionProtocol.LAMBDA, + filterPolicy: this.props.filterPolicy, + }; } } diff --git a/packages/@aws-cdk/aws-sns-subscriptions/lib/sqs.ts b/packages/@aws-cdk/aws-sns-subscriptions/lib/sqs.ts index 5bd76ffbda013..02092f1dd9ac6 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/lib/sqs.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/lib/sqs.ts @@ -2,11 +2,12 @@ import iam = require('@aws-cdk/aws-iam'); import sns = require('@aws-cdk/aws-sns'); import sqs = require('@aws-cdk/aws-sqs'); import { Construct } from '@aws-cdk/cdk'; +import { SubscriptionProps } from './subscription'; /** * Properties for an SQS subscription */ -export interface SqsSubscriptionProps { +export interface SqsSubscriptionProps extends SubscriptionProps { /** * The message to the queue is the same as it was sent to the topic * @@ -24,27 +25,13 @@ export class SqsSubscription implements sns.ITopicSubscription { constructor(private readonly queue: sqs.IQueue, private readonly props: SqsSubscriptionProps = {}) { } - public bind(_scope: Construct, topic: sns.ITopic): void { + public bind(topic: sns.ITopic): sns.TopicSubscriptionConfig { // Create subscription under *consuming* construct to make sure it ends up // in the correct stack in cases of cross-stack subscriptions. if (!Construct.isConstruct(this.queue)) { throw new Error(`The supplied Queue object must be an instance of Construct`); } - // we use the queue name as the subscription's. there's no meaning to - // subscribing the same queue twice on the same topic. - const subscriptionName = topic.node.id + 'Subscription'; - if (this.queue.node.tryFindChild(subscriptionName)) { - throw new Error(`A subscription between the topic ${topic.node.id} and the queue ${this.queue.node.id} already exists`); - } - - new sns.Subscription(this.queue, subscriptionName, { - topic, - endpoint: this.queue.queueArn, - protocol: sns.SubscriptionProtocol.SQS, - rawMessageDelivery: this.props.rawMessageDelivery, - }); - // add a statement to the queue resource policy which allows this topic // to send messages to the queue. this.queue.addToResourcePolicy(new iam.PolicyStatement({ @@ -55,5 +42,13 @@ export class SqsSubscription implements sns.ITopicSubscription { ArnEquals: { 'aws:SourceArn': topic.topicArn } } })); + + return { + subscriberId: this.queue.node.id, + endpoint: this.queue.queueArn, + protocol: sns.SubscriptionProtocol.SQS, + rawMessageDelivery: this.props.rawMessageDelivery, + filterPolicy: this.props.filterPolicy, + }; } } diff --git a/packages/@aws-cdk/aws-sns-subscriptions/lib/subscription.ts b/packages/@aws-cdk/aws-sns-subscriptions/lib/subscription.ts new file mode 100644 index 0000000000000..603f54c835583 --- /dev/null +++ b/packages/@aws-cdk/aws-sns-subscriptions/lib/subscription.ts @@ -0,0 +1,10 @@ +import sns = require('@aws-cdk/aws-sns'); + +export interface SubscriptionProps { + /** + * The filter policy. + * + * @default - all messages are delivered + */ + readonly filterPolicy?: { [attribute: string]: sns.SubscriptionFilter }; +} diff --git a/packages/@aws-cdk/aws-sns-subscriptions/lib/url.ts b/packages/@aws-cdk/aws-sns-subscriptions/lib/url.ts index 633ff0d04f63d..5eb0029a93e48 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/lib/url.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/lib/url.ts @@ -1,10 +1,10 @@ import sns = require('@aws-cdk/aws-sns'); -import { Construct } from '@aws-cdk/cdk'; +import { SubscriptionProps } from './subscription'; /** * Options for URL subscriptions. */ -export interface UrlSubscriptionProps { +export interface UrlSubscriptionProps extends SubscriptionProps { /** * The message to the queue is the same as it was sent to the topic * @@ -29,12 +29,13 @@ export class UrlSubscription implements sns.ITopicSubscription { } } - public bind(scope: Construct, topic: sns.ITopic): void { - new sns.Subscription(scope, this.url, { - topic, + public bind(_topic: sns.ITopic): sns.TopicSubscriptionConfig { + return { + subscriberId: this.url, endpoint: this.url, protocol: this.url.startsWith('https:') ? sns.SubscriptionProtocol.HTTPS : sns.SubscriptionProtocol.HTTP, rawMessageDelivery: this.props.rawMessageDelivery, - }); + filterPolicy: this.props.filterPolicy, + }; } } diff --git a/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.expected.json b/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.expected.json index 62614184e66a6..c2b5ff331efd6 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.expected.json +++ b/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.expected.json @@ -3,6 +3,65 @@ "MyTopic86869434": { "Type": "AWS::SNS::Topic" }, + "MyTopicEchoD1E0EE5C": { + "Type": "AWS::SNS::Subscription", + "Properties": { + "Protocol": "lambda", + "TopicArn": { + "Ref": "MyTopic86869434" + }, + "Endpoint": { + "Fn::GetAtt": [ + "Echo11F3FB29", + "Arn" + ] + } + } + }, + "MyTopicFiltered55457D11": { + "Type": "AWS::SNS::Subscription", + "Properties": { + "Protocol": "lambda", + "TopicArn": { + "Ref": "MyTopic86869434" + }, + "Endpoint": { + "Fn::GetAtt": [ + "Filtered186C0D0A", + "Arn" + ] + }, + "FilterPolicy": { + "color": [ + "red", + { + "prefix": "bl" + }, + { + "prefix": "ye" + } + ], + "size": [ + { + "anything-but": [ + "small", + "medium" + ] + } + ], + "price": [ + { + "numeric": [ + ">=", + 100, + "<=", + 200 + ] + } + ] + } + } + }, "EchoServiceRoleBE28060B": { "Type": "AWS::IAM::Role", "Properties": { @@ -63,28 +122,89 @@ "EchoServiceRoleBE28060B" ] }, - "EchoMyTopicSubscriptionA634C6D7": { - "Type": "AWS::SNS::Subscription", + "EchoMyTopicF6EBB45F": { + "Type": "AWS::Lambda::Permission", "Properties": { - "Protocol": "lambda", - "TopicArn": { - "Ref": "MyTopic86869434" - }, - "Endpoint": { + "Action": "lambda:InvokeFunction", + "FunctionName": { "Fn::GetAtt": [ "Echo11F3FB29", "Arn" ] + }, + "Principal": "sns.amazonaws.com", + "SourceArn": { + "Ref": "MyTopic86869434" } } }, - "EchoMyTopicF6EBB45F": { + "FilteredServiceRole16D9DDC1": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": { + "Fn::Join": [ + "", + [ + "lambda.", + { + "Ref": "AWS::URLSuffix" + } + ] + ] + } + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "Filtered186C0D0A": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = function handler(event, _context, callback) {\n // tslint:disable:no-console\n console.log('====================================================');\n console.log(JSON.stringify(event, undefined, 2));\n console.log('====================================================');\n return callback(undefined, event);\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "FilteredServiceRole16D9DDC1", + "Arn" + ] + }, + "Runtime": "nodejs8.10" + }, + "DependsOn": [ + "FilteredServiceRole16D9DDC1" + ] + }, + "FilteredMyTopic804BCBC3": { "Type": "AWS::Lambda::Permission", "Properties": { "Action": "lambda:InvokeFunction", "FunctionName": { "Fn::GetAtt": [ - "Echo11F3FB29", + "Filtered186C0D0A", "Arn" ] }, @@ -95,4 +215,4 @@ } } } -} +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.ts b/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.ts index 4a6efc3f5699d..bd4e992df9c24 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-lambda.ts @@ -3,7 +3,7 @@ import sns = require('@aws-cdk/aws-sns'); import cdk = require('@aws-cdk/cdk'); import subs = require('../lib'); -class SnsToSqs extends cdk.Stack { +class SnsToLambda extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); @@ -16,12 +16,33 @@ class SnsToSqs extends cdk.Stack { }); topic.addSubscription(new subs.LambdaSubscription(fction)); + + const fctionFiltered = new lambda.Function(this, 'Filtered', { + handler: 'index.handler', + runtime: lambda.Runtime.Nodejs810, + code: lambda.Code.inline(`exports.handler = ${handler.toString()}`) + }); + + topic.addSubscription(new subs.LambdaSubscription(fctionFiltered, { + filterPolicy: { + color: sns.SubscriptionFilter.stringFilter({ + whitelist: ['red'], + matchPrefixes: ['bl', 'ye'], + }), + size: sns.SubscriptionFilter.stringFilter({ + blacklist: ['small', 'medium'], + }), + price: sns.SubscriptionFilter.numericFilter({ + between: { start: 100, stop: 200 } + }) + } + })); } } const app = new cdk.App(); -new SnsToSqs(app, 'aws-cdk-sns-lambda'); +new SnsToLambda(app, 'aws-cdk-sns-lambda'); app.synth(); diff --git a/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-sqs.lit.expected.json b/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-sqs.lit.expected.json index 868a13aed58da..31a0f99f6514a 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-sqs.lit.expected.json +++ b/packages/@aws-cdk/aws-sns-subscriptions/test/integ.sns-sqs.lit.expected.json @@ -3,10 +3,7 @@ "MyTopic86869434": { "Type": "AWS::SNS::Topic" }, - "MyQueueE6CA6235": { - "Type": "AWS::SQS::Queue" - }, - "MyQueueMyTopicSubscriptionEB66AD1B": { + "MyTopicMyQueueFA241964": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "sqs", @@ -21,6 +18,9 @@ } } }, + "MyQueueE6CA6235": { + "Type": "AWS::SQS::Queue" + }, "MyQueuePolicy6BBEDDAC": { "Type": "AWS::SQS::QueuePolicy", "Properties": { diff --git a/packages/@aws-cdk/aws-sns-subscriptions/test/subs.test.ts b/packages/@aws-cdk/aws-sns-subscriptions/test/subs.test.ts index 6eaa4790a4d73..2dd8c907469e7 100644 --- a/packages/@aws-cdk/aws-sns-subscriptions/test/subs.test.ts +++ b/packages/@aws-cdk/aws-sns-subscriptions/test/subs.test.ts @@ -85,7 +85,7 @@ test('queue subscription', () => { "TopicName": "topicName" } }, - "MyQueueMyTopicSubscriptionEB66AD1B": { + "MyTopicMyQueueFA241964": { "Type": "AWS::SNS::Subscription", "Properties": { "Endpoint": { @@ -180,7 +180,7 @@ test('lambda subscription', () => { "TopicName": "topicName" } }, - "MyFuncMyTopicSubscription708A6535": { + "MyTopicMyFunc853BC1D3": { "Type": "AWS::SNS::Subscription", "Properties": { "Endpoint": { @@ -300,7 +300,7 @@ test('multiple subscriptions', () => { "TopicName": "topicName" } }, - "MyQueueMyTopicSubscriptionEB66AD1B": { + "MyTopicMyQueueFA241964": { "Type": "AWS::SNS::Subscription", "Properties": { "Endpoint": { @@ -315,7 +315,7 @@ test('multiple subscriptions', () => { } } }, - "MyFuncMyTopicSubscription708A6535": { + "MyTopicMyFunc853BC1D3": { "Type": "AWS::SNS::Subscription", "Properties": { "Endpoint": { @@ -437,3 +437,67 @@ test('multiple subscriptions', () => { } }); }); + +test('throws with mutliple subscriptions of the same subscriber', () => { + const queue = new sqs.Queue(stack, 'MyQueue'); + + topic.addSubscription(new subs.SqsSubscription(queue)); + + expect(() => topic.addSubscription(new subs.SqsSubscription(queue))) + .toThrowError(/subscriber MyQueue already exists/); +}); + +test('with filter policy', () => { + const fction = new lambda.Function(stack, 'MyFunc', { + runtime: lambda.Runtime.Nodejs810, + handler: 'index.handler', + code: lambda.Code.inline('exports.handler = function(e, c, cb) { return cb() }') + }); + + topic.addSubscription(new subs.LambdaSubscription(fction, { + filterPolicy: { + color: sns.SubscriptionFilter.stringFilter({ + whitelist: ['red'], + matchPrefixes: ['bl', 'ye'], + }), + size: sns.SubscriptionFilter.stringFilter({ + blacklist: ['small', 'medium'], + }), + price: sns.SubscriptionFilter.numericFilter({ + between: { start: 100, stop: 200 } + }) + } + })); + + expect(stack).toHaveResource('AWS::SNS::Subscription', { + "FilterPolicy": { + "color": [ + "red", + { + "prefix": "bl" + }, + { + "prefix": "ye" + } + ], + "size": [ + { + "anything-but": [ + "small", + "medium" + ] + } + ], + "price": [ + { + "numeric": [ + ">=", + 100, + "<=", + 200 + ] + } + ] + } + }); +}); diff --git a/packages/@aws-cdk/aws-sns/README.md b/packages/@aws-cdk/aws-sns/README.md index 15e8f180e2609..82a1cf6a8d022 100644 --- a/packages/@aws-cdk/aws-sns/README.md +++ b/packages/@aws-cdk/aws-sns/README.md @@ -51,6 +51,35 @@ myTopic.addSubscription(new subs.SqsSubscription(queue)); Note that subscriptions of queues in different accounts need to be manually confirmed by reading the initial message from the queue and visiting the link found in it. +#### Filter policy +A filter policy can be specified when subscribing an endpoint to a topic. + +Example with a Lambda subscription: +```ts +const myTopic = new sns.Topic(this, 'MyTopic'); +const fn = new lambda.Function(this, 'Function', ...); + +// Lambda should receive only message matching the following conditions on attributes: +// color: 'red' or 'orange' or begins with 'bl' +// size: anything but 'small' or 'medium' +// price: between 100 and 200 or greater than 300 +topic.subscribeLambda(new subs.LambdaSubscription(fn, { + filterPolicy: { + color: sns.SubscriptionFilter.stringFilter({ + whitelist: ['red', 'orange'], + matchPrefixes: ['bl'] + }), + size: sns.SubscriptionFilter.stringFilter({ + blacklist: ['small', 'medium'], + }), + price: sns.SubscriptionFilter.numericFilter({ + between: { start: 100, stop: 200 }, + greaterThan: 300 + }) + } +})); +``` + ### CloudWatch Event Rule Target SNS topics can be used as targets for CloudWatch event rules. diff --git a/packages/@aws-cdk/aws-sns/lib/index.ts b/packages/@aws-cdk/aws-sns/lib/index.ts index cc33331121840..cdcb67084921e 100644 --- a/packages/@aws-cdk/aws-sns/lib/index.ts +++ b/packages/@aws-cdk/aws-sns/lib/index.ts @@ -3,6 +3,7 @@ export * from './topic'; export * from './topic-base'; export * from './subscription'; export * from './subscriber'; +export * from './subscription-filter'; // AWS::SNS CloudFormation Resources: export * from './sns.generated'; diff --git a/packages/@aws-cdk/aws-sns/lib/subscriber.ts b/packages/@aws-cdk/aws-sns/lib/subscriber.ts index 82cd69d1bbc6d..1077732ef8df9 100644 --- a/packages/@aws-cdk/aws-sns/lib/subscriber.ts +++ b/packages/@aws-cdk/aws-sns/lib/subscriber.ts @@ -1,9 +1,20 @@ -import { Construct } from '@aws-cdk/cdk'; +import { SubscriptionOptions } from './subscription'; import { ITopic } from './topic-base'; +/** + * Subscription configuration + */ +export interface TopicSubscriptionConfig extends SubscriptionOptions { + /** + * The id of the subscriber. Will be used as the id for the subscription in + * the topic's scope. + */ + readonly subscriberId: string; +} + /** * Topic subscription */ export interface ITopicSubscription { - bind(scope: Construct, topic: ITopic): void; + bind(topic: ITopic): TopicSubscriptionConfig; } diff --git a/packages/@aws-cdk/aws-sns/lib/subscription-filter.ts b/packages/@aws-cdk/aws-sns/lib/subscription-filter.ts new file mode 100644 index 0000000000000..188e843cd1fc7 --- /dev/null +++ b/packages/@aws-cdk/aws-sns/lib/subscription-filter.ts @@ -0,0 +1,139 @@ +/** + * Conditions that can be applied to string attributes. + */ +export interface StringConditions { + /** + * Match one or more values. + */ + readonly whitelist?: string[]; + + /** + * Match any value that doesn't include any of the specified values. + */ + readonly blacklist?: string[]; + + /** + * Matches values that begins with the specified prefixes. + */ + readonly matchPrefixes?: string[]; +} + +/** + * Between condition for a numeric attribute. + */ +export interface BetweenCondition { + /** + * The start value. + */ + readonly start: number; + + /** + * The stop value. + */ + readonly stop: number; +} + +/** + * Conditions that can be applied to numeric attributes. + */ +export interface NumericConditions { + /** + * Match one or more values. + */ + readonly whitelist?: number[]; + + /** + * Match values that are greater than the specified value. + */ + readonly greaterThan?: number; + + /** + * Match values that are greater than or equal to the specified value. + */ + readonly greaterThanOrEqualTo?: number; + + /** + * Match values that are less than the specified value. + */ + readonly lessThan?: number; + + /** + * Match values that are less than or equal to the specified value. + */ + readonly lessThanOrEqualTo?: number; + + /** + * Match values that are between the specified values. + */ + readonly between?: BetweenCondition; + + /** + * Match values that are strictly between the specified values. + */ + readonly betweenStrict?: BetweenCondition; +} + +/** + * A subscription filter for an attribute. + */ +export class SubscriptionFilter { + /** + * Returns a subscription filter for a string attribute. + */ + public static stringFilter(stringConditions: StringConditions) { + const conditions = []; + + if (stringConditions.whitelist) { + conditions.push(...stringConditions.whitelist); + } + + if (stringConditions.blacklist) { + conditions.push({ 'anything-but': stringConditions.blacklist }); + } + + if (stringConditions.matchPrefixes) { + conditions.push(...stringConditions.matchPrefixes.map(p => ({ prefix: p }))); + } + + return new SubscriptionFilter(conditions); + } + + /** + * Returns a subscription filter for a numeric attribute. + */ + public static numericFilter(numericConditions: NumericConditions) { + const conditions = []; + + if (numericConditions.whitelist) { + conditions.push(...numericConditions.whitelist.map(v => ({ numeric: ['=', v] }))); + } + + if (numericConditions.greaterThan) { + conditions.push({ numeric: ['>', numericConditions.greaterThan] }); + } + + if (numericConditions.greaterThanOrEqualTo) { + conditions.push({ numeric: ['>=', numericConditions.greaterThanOrEqualTo] }); + } + + if (numericConditions.lessThan) { + conditions.push({ numeric: ['<', numericConditions.lessThan] }); + } + + if (numericConditions.lessThanOrEqualTo) { + conditions.push({ numeric: ['<=', numericConditions.lessThanOrEqualTo] }); + } + + if (numericConditions.between) { + conditions.push({ numeric: ['>=', numericConditions.between.start, '<=', numericConditions.between.stop ]}); + } + + if (numericConditions.betweenStrict) { + conditions.push({ numeric: ['>', numericConditions.betweenStrict.start, '<', numericConditions.betweenStrict.stop ]}); + } + + return new SubscriptionFilter(conditions); + } + + constructor(public readonly conditions: any[] = []) {} +} diff --git a/packages/@aws-cdk/aws-sns/lib/subscription.ts b/packages/@aws-cdk/aws-sns/lib/subscription.ts index 16accc8b6424c..9c8a5c4bcaf2a 100644 --- a/packages/@aws-cdk/aws-sns/lib/subscription.ts +++ b/packages/@aws-cdk/aws-sns/lib/subscription.ts @@ -1,11 +1,12 @@ import { Construct, Resource } from '@aws-cdk/cdk'; import { CfnSubscription } from './sns.generated'; +import { SubscriptionFilter } from './subscription-filter'; import { ITopic } from './topic-base'; /** - * Properties for creating a new subscription + * Options for creating a new subscription */ -export interface SubscriptionProps { +export interface SubscriptionOptions { /** * What type of subscription to add. */ @@ -18,11 +19,6 @@ export interface SubscriptionProps { */ readonly endpoint: string; - /** - * The topic to subscribe to. - */ - readonly topic: ITopic; - /** * true if raw message delivery is enabled for the subscription. Raw messages are free of JSON formatting and can be * sent to HTTP/S and Amazon SQS endpoints. For more information, see GetSubscriptionAttributes in the Amazon Simple @@ -31,15 +27,33 @@ export interface SubscriptionProps { * @default false */ readonly rawMessageDelivery?: boolean; + + /** + * The filter policy. + * + * @default - all messages are delivered + */ + readonly filterPolicy?: { [attribute: string]: SubscriptionFilter }; +} +/** + * Properties for creating a new subscription + */ +export interface SubscriptionProps extends SubscriptionOptions { + /** + * The topic to subscribe to. + */ + readonly topic: ITopic; } /** * A new subscription. * - * Prefer to use the `ITopic.subscribeXxx()` methods to creating instances of + * Prefer to use the `ITopic.addSubscription()` methods to create instances of * this class. */ export class Subscription extends Resource { + private readonly filterPolicy?: { [attribute: string]: any[] }; + constructor(scope: Construct, id: string, props: SubscriptionProps) { super(scope, id); @@ -47,11 +61,30 @@ export class Subscription extends Resource { throw new Error('Raw message delivery can only be enabled for HTTP/S and SQS subscriptions.'); } + if (props.filterPolicy) { + if (Object.keys(props.filterPolicy).length > 5) { + throw new Error('A filter policy can have a maximum of 5 attribute names.'); + } + + this.filterPolicy = Object.entries(props.filterPolicy) + .reduce( + (acc, [k, v]) => ({ ...acc, [k]: v.conditions }), + {} + ); + + let total = 1; + Object.values(this.filterPolicy).forEach(filter => { total *= filter.length; }); + if (total > 100) { + throw new Error(`The total combination of values (${total}) must not exceed 100.`); + } + } + new CfnSubscription(this, 'Resource', { endpoint: props.endpoint, protocol: props.protocol, topicArn: props.topic.topicArn, rawMessageDelivery: props.rawMessageDelivery, + filterPolicy: this.filterPolicy, }); } diff --git a/packages/@aws-cdk/aws-sns/lib/topic-base.ts b/packages/@aws-cdk/aws-sns/lib/topic-base.ts index dce05fadaa28d..cd76a274d10b4 100644 --- a/packages/@aws-cdk/aws-sns/lib/topic-base.ts +++ b/packages/@aws-cdk/aws-sns/lib/topic-base.ts @@ -2,6 +2,7 @@ import iam = require('@aws-cdk/aws-iam'); import { IResource, Resource } from '@aws-cdk/cdk'; import { TopicPolicy } from './policy'; import { ITopicSubscription } from './subscriber'; +import { Subscription } from './subscription'; export interface ITopic extends IResource { /** @@ -55,7 +56,18 @@ export abstract class TopicBase extends Resource implements ITopic { * Subscribe some endpoint to this topic */ public addSubscription(subscription: ITopicSubscription) { - subscription.bind(this, this); + const subscriptionConfig = subscription.bind(this); + + // We use the subscriber's id as the construct id. There's no meaning + // to subscribing the same subscriber twice on the same topic. + if (this.node.tryFindChild(subscriptionConfig.subscriberId)) { + throw new Error(`A subscription between the topic ${this.node.id} and the subscriber ${subscriptionConfig.subscriberId} already exists`); + } + + new Subscription(this, subscriptionConfig.subscriberId, { + topic: this, + ...subscriptionConfig, + }); } /** diff --git a/packages/@aws-cdk/aws-sns/package.json b/packages/@aws-cdk/aws-sns/package.json index 3f6da9eed7cc6..16054cc704953 100644 --- a/packages/@aws-cdk/aws-sns/package.json +++ b/packages/@aws-cdk/aws-sns/package.json @@ -92,10 +92,9 @@ "awslint": { "exclude": [ "construct-base-is-private:@aws-cdk/aws-sns.TopicBase", - "integ-return-type:@aws-cdk/aws-sns.ITopicSubscription.bind", "props-physical-name:@aws-cdk/aws-sns.SubscriptionProps", "props-physical-name:@aws-cdk/aws-sns.TopicPolicyProps" ] }, "stability": "experimental" -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-sns/test/test.sns.ts b/packages/@aws-cdk/aws-sns/test/test.sns.ts index 160cde6ca49fe..4eff0c8579c80 100644 --- a/packages/@aws-cdk/aws-sns/test/test.sns.ts +++ b/packages/@aws-cdk/aws-sns/test/test.sns.ts @@ -88,8 +88,6 @@ export = { test.done(); }, }, - 'subscription tests': { - }, 'can add a policy to the topic'(test: Test) { // GIVEN diff --git a/packages/@aws-cdk/aws-sns/test/test.subscription.ts b/packages/@aws-cdk/aws-sns/test/test.subscription.ts new file mode 100644 index 0000000000000..6bf653ce95f35 --- /dev/null +++ b/packages/@aws-cdk/aws-sns/test/test.subscription.ts @@ -0,0 +1,139 @@ +import { expect, haveResource } from '@aws-cdk/assert'; +import cdk = require('@aws-cdk/cdk'); +import { Test } from 'nodeunit'; +import sns = require('../lib'); + +export = { + 'create a subscription'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + + // WHEN + new sns.Subscription(stack, 'Subscription', { + endpoint: 'endpoint', + protocol: sns.SubscriptionProtocol.LAMBDA, + topic + }); + + // THEN + expect(stack).to(haveResource('AWS::SNS::Subscription', { + Endpoint: 'endpoint', + Protocol: 'lambda', + TopicArn: { + Ref: 'TopicBFC7AF6E' + } + })); + test.done(); + }, + + 'with filter policy'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + + // WHEN + new sns.Subscription(stack, 'Subscription', { + endpoint: 'endpoint', + filterPolicy: { + color: sns.SubscriptionFilter.stringFilter({ + whitelist: ['red', 'green'], + blacklist: ['white', 'orange'], + matchPrefixes: ['bl', 'ye'], + }), + price: sns.SubscriptionFilter.numericFilter({ + whitelist: [100, 200], + between: { start: 300, stop: 350 }, + greaterThan: 500, + lessThan: 1000, + betweenStrict: { start: 2000, stop: 3000 }, + greaterThanOrEqualTo: 1000, + lessThanOrEqualTo: -2, + }) + }, + protocol: sns.SubscriptionProtocol.LAMBDA, + topic + }); + + // THEN + expect(stack).to(haveResource('AWS::SNS::Subscription', { + FilterPolicy: { + color: [ + 'red', + 'green', + {'anything-but': ['white', 'orange']}, + { prefix: 'bl'}, + { prefix: 'ye'} + ], + price: [ + { numeric: ['=', 100] }, + { numeric: ['=', 200] }, + { numeric: ['>', 500] }, + { numeric: ['>=', 1000] }, + { numeric: ['<', 1000] }, + { numeric: ['<=', -2] }, + { numeric: ['>=', 300, '<=', 350] }, + { numeric: ['>', 2000, '<', 3000] }, + ] + }, + })); + test.done(); + }, + + 'throws with raw delivery for protocol other than http, https or sqs'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + + // THEN + test.throws(() => new sns.Subscription(stack, 'Subscription', { + endpoint: 'endpoint', + protocol: sns.SubscriptionProtocol.LAMBDA, + topic, + rawMessageDelivery: true + }), /Raw message delivery/); + test.done(); + }, + + 'throws with more than 5 attributes in a filter policy'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + const cond = { conditions: [] }; + + // THEN + test.throws(() => new sns.Subscription(stack, 'Subscription', { + endpoint: 'endpoint', + protocol: sns.SubscriptionProtocol.LAMBDA, + topic, + filterPolicy: { + a: cond, + b: cond, + c: cond, + d: cond, + e: cond, + f: cond, + }, + }), /5 attribute names/); + test.done(); + }, + + 'throws with more than 100 conditions in a filter policy'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + + // THEN + test.throws(() => new sns.Subscription(stack, 'Subscription', { + endpoint: 'endpoint', + protocol: sns.SubscriptionProtocol.LAMBDA, + topic, + filterPolicy: { + a: { conditions: [...Array.from(Array(2).keys())] }, + b: { conditions: [...Array.from(Array(10).keys())] }, + c: { conditions: [...Array.from(Array(6).keys())] }, + }, + }), /\(120\) must not exceed 100/); + test.done(); + } +}; diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.expected.json b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.expected.json index 31cb13d038435..42861dbecbadd 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.expected.json +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.expected.json @@ -306,7 +306,7 @@ "FargateClusterDefaultAutoScalingGroupDrainECSHookFunctionServiceRole7FEDCD32" ] }, - "FargateClusterDefaultAutoScalingGroupDrainECSHookFunctionTopicSubscriptionA4A8D57E": { + "FargateClusterDefaultAutoScalingGroupLifecycleHookDrainHookTopicFunctionE74B772A": { "Type": "AWS::SNS::Subscription", "Properties": { "Protocol": "lambda", @@ -978,4 +978,4 @@ "Description": "Artifact hash for asset \"aws-ecs-integ2/AdoptEcrRepositorydbc60defc59544bcaa5c28c95d68f62c/Code\"" } } -} \ No newline at end of file +} diff --git a/packages/decdk/test/__snapshots__/synth.test.js.snap b/packages/decdk/test/__snapshots__/synth.test.js.snap index bf327752cb659..12f454af5a20e 100644 --- a/packages/decdk/test/__snapshots__/synth.test.js.snap +++ b/packages/decdk/test/__snapshots__/synth.test.js.snap @@ -1227,21 +1227,6 @@ Object { }, "Type": "AWS::Lambda::Permission", }, - "HelloWorldFunctionMyTopicSubscription20D3FA87": Object { - "Properties": Object { - "Endpoint": Object { - "Fn::GetAtt": Array [ - "HelloWorldFunctionB2AB6E79", - "Arn", - ], - }, - "Protocol": "lambda", - "TopicArn": Object { - "Ref": "MyTopic86869434", - }, - }, - "Type": "AWS::SNS::Subscription", - }, "HelloWorldFunctionServiceRole8E0BD458": Object { "Properties": Object { "AssumeRolePolicyDocument": Object { @@ -1321,6 +1306,21 @@ Object { "MyTopic86869434": Object { "Type": "AWS::SNS::Topic", }, + "MyTopicHelloWorldFunction831B106E": Object { + "Properties": Object { + "Endpoint": Object { + "Fn::GetAtt": Array [ + "HelloWorldFunctionB2AB6E79", + "Arn", + ], + }, + "Protocol": "lambda", + "TopicArn": Object { + "Ref": "MyTopic86869434", + }, + }, + "Type": "AWS::SNS::Subscription", + }, "TableCD117FA1": Object { "Properties": Object { "AttributeDefinitions": Array [ @@ -1659,7 +1659,10 @@ Object { }, "Type": "AWS::Lambda::Permission", }, - "LambdaTopicSubscriptionF6CC623D": Object { + "TopicBFC7AF6E": Object { + "Type": "AWS::SNS::Topic", + }, + "TopicLambda3DD31D45": Object { "Properties": Object { "Endpoint": Object { "Fn::GetAtt": Array [ @@ -1674,9 +1677,6 @@ Object { }, "Type": "AWS::SNS::Subscription", }, - "TopicBFC7AF6E": Object { - "Type": "AWS::SNS::Topic", - }, }, } `;