Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sns): add support for subscription filter policy #2778

Merged
merged 16 commits into from
Jun 21, 2019
Merged
20 changes: 20 additions & 0 deletions packages/@aws-cdk/aws-sns/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ Subscribe a queue to the topic:
Note that subscriptions of queues in different accounts need to be manually confirmed by
reading the initial message from the queue and visiting the link found in it.

#### Filter policy
A filter policy can be specified when subscribing an endpoint to a topic.

Example with a Lambda subscription:
```ts
const myTopic = new sns.Topic(this, 'MyTopic');
const fn = new lambda.Function(this, 'Function', ...);

// Lambda should receive only message matching the following conditions on attributes:
// color: 'red' or 'orange' or begins with 'bl'
// size: anything but 'small' or 'medium'
// price: between 100 and 200 or greater than 300
const filterPolicy = new sns.SubscriptionFilterPolicy();
filterPolicy.addStringFilter('color').whitelist('red', 'orange').matchPrefixes('bl');
filterPolicy.addStringFilter('size').blacklist('small', 'medium');
filterPolicy.addNumericFilter('price').between(100, 200).greaterThan(300);

topic.subscribeLambda(fn, filterPolicy);
```

### CloudWatch Event Rule Target

SNS topics can be used as targets for CloudWatch event rules.
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-sns/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export * from './policy';
export * from './topic';
export * from './topic-base';
export * from './subscription';
export * from './subscription-filter-policy';

// AWS::SNS CloudFormation Resources:
export * from './sns.generated';
Expand Down
150 changes: 150 additions & 0 deletions packages/@aws-cdk/aws-sns/lib/subscription-filter-policy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
abstract class Filter {
/**
* The conditions of the filter.
* Conditions are `OR`ed.
*/
public readonly conditions: any[] = [];
}

/**
* Filter for a string attribute.
*/
export class StringFilter extends Filter {
/**
* Match one or more values.
* Can be chained with other conditions.
*/
public whitelist(...values: string[]) {
jogold marked this conversation as resolved.
Show resolved Hide resolved
this.conditions.push(...values);
return this;
}

/**
* Match any value that doesn't include any of the specified values.
* Can be chained with other conditions.
*/
public blacklist(...values: string[]) {
this.conditions.push({ 'anything-but': values });
return this;
}

/**
* Matches values that begins with the specified prefixes.
* Can be chained with other conditions.
*/
public matchPrefixes(...prefixes: string[]) {
this.conditions.push(...prefixes.map(p => ({ prefix: p })));
return this;
}
}

export class NumericFilter extends Filter {
/**
* Match one or more values.
* Can be chained with other conditions.
*/
public whitelist(...values: number[]) {
this.conditions.push(...values.map(v => ({ numeric: ['=', v] })));
return this;
}

/**
* Match values that are greater than the specified value.
* Can be chained with other conditions.
*/
public greaterThan(value: number) {
this.conditions.push({ numeric: ['>', value] });
return this;
}

/**
* Match values that are greater than or equal to the specified value.
* Can be chained with other conditions.
*/
public greaterThanOrEqualTo(value: number) {
this.conditions.push({ numeric: ['>=', value] });
return this;
}

/**
* Match values that are less than the specified value.
* Can be chained with other conditions.
*/
public lessThan(value: number) {
this.conditions.push({ numeric: ['<', value] });
return this;
}

/**
* Match values that are less than or equal to the specified value.
* Can be chained with other conditions.
*/
public lessThanOrEqualTo(value: number) {
this.conditions.push({ numeric: ['<=', value] });
return this;
}

/**
* Match values that are between the specified values.
* Can be chained with other conditions.
*/
public between(start: number, stop: number) {
this.conditions.push({ numeric: ['>=', start, '<=', stop ]});
return this;
}

/**
* Match values that are strictly between the specified values.
* Can be chained with other conditions.
*/
public betweenStrict(start: number, stop: number) {
this.conditions.push({ numeric: ['>', start, '<', stop ]});
return this;
}
}

/**
* A SNS subscription filter policy.
*/
export class SubscriptionFilterPolicy {
private readonly policy: { [name: string]: any[] } = {};

/**
* Add a filter on a string attribute.
*
* @param name the attribute name
*/
public addStringFilter(name: string): StringFilter {
const filter = new StringFilter();
this.policy[name] = filter.conditions;
return filter;
}

/**
* Add a filter on a numeric attribute.
*
* @param name the attribute name
*/
public addNumericFilter(name: string): NumericFilter {
const filter = new NumericFilter();
this.policy[name] = filter.conditions;
return filter;
}

/**
* Renders the policy.
*/
public render() {
if (Object.keys(this.policy).length > 5) {
throw new Error('A filter policy can have a maximum of 5 attribute names.');
}

let total = 1;
Object.values(this.policy).forEach(filter => { total *= filter.length; });
if (total > 100) {
throw new Error(`The total combination of values (${total}) must not exceed 100.`);
}

return this.policy;
}
}
9 changes: 9 additions & 0 deletions packages/@aws-cdk/aws-sns/lib/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Construct, Resource } from '@aws-cdk/cdk';
import { CfnSubscription } from './sns.generated';
import { SubscriptionFilterPolicy } from './subscription-filter-policy';
import { ITopic } from './topic-base';

/**
Expand Down Expand Up @@ -31,6 +32,13 @@ export interface SubscriptionProps {
* @default false
*/
readonly rawMessageDelivery?: boolean;

/**
* The filter policy.
*
* @default all messages are delivered
jogold marked this conversation as resolved.
Show resolved Hide resolved
*/
readonly filterPolicy?: SubscriptionFilterPolicy;
}

/**
Expand All @@ -52,6 +60,7 @@ export class Subscription extends Resource {
protocol: props.protocol,
topicArn: props.topic.topicArn,
rawMessageDelivery: props.rawMessageDelivery,
filterPolicy: props.filterPolicy && props.filterPolicy.render(),
});

}
Expand Down
39 changes: 28 additions & 11 deletions packages/@aws-cdk/aws-sns/lib/topic-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import cdk = require('@aws-cdk/cdk');
import { IResource, Resource } from '@aws-cdk/cdk';
import { TopicPolicy } from './policy';
import { Subscription, SubscriptionProtocol } from './subscription';
import { SubscriptionFilterPolicy } from './subscription-filter-policy';

export interface ITopic extends IResource {
/**
Expand All @@ -20,7 +21,11 @@ export interface ITopic extends IResource {
/**
* Subscribe some endpoint to this topic
*/
subscribe(name: string, endpoint: string, protocol: SubscriptionProtocol, rawMessageDelivery?: boolean): Subscription;
subscribe(name: string,
endpoint: string,
protocol: SubscriptionProtocol,
rawMessageDelivery?: boolean,
filterPolicy?: SubscriptionFilterPolicy): Subscription;

/**
* Defines a subscription from this SNS topic to an SQS queue.
Expand All @@ -31,7 +36,7 @@ export interface ITopic extends IResource {
* @param queue The target queue
* @param rawMessageDelivery Enable raw message delivery
*/
subscribeQueue(queue: sqs.IQueue, rawMessageDelivery?: boolean): Subscription;
subscribeQueue(queue: sqs.IQueue, rawMessageDelivery?: boolean, filterPolicy?: SubscriptionFilterPolicy): Subscription;

/**
* Defines a subscription from this SNS Topic to a Lambda function.
Expand All @@ -41,7 +46,7 @@ export interface ITopic extends IResource {
*
* @param lambdaFunction The Lambda function to invoke
*/
subscribeLambda(lambdaFunction: lambda.IFunction): Subscription;
subscribeLambda(lambdaFunction: lambda.IFunction, filterPolicy?: SubscriptionFilterPolicy): Subscription;

/**
* Defines a subscription from this SNS topic to an email address.
Expand All @@ -50,7 +55,7 @@ export interface ITopic extends IResource {
* @param emailAddress The email address to use.
* @param options Options to use for email subscription
*/
subscribeEmail(name: string, emailAddress: string, options?: EmailSubscriptionOptions): Subscription;
subscribeEmail(name: string, emailAddress: string, options?: EmailSubscriptionOptions, filterPolicy?: SubscriptionFilterPolicy): Subscription;

/**
* Defines a subscription from this SNS topic to an http:// or https:// URL.
Expand All @@ -59,7 +64,7 @@ export interface ITopic extends IResource {
* @param url The URL to invoke
* @param rawMessageDelivery Enable raw message delivery
*/
subscribeUrl(name: string, url: string, rawMessageDelivery?: boolean): Subscription;
subscribeUrl(name: string, url: string, rawMessageDelivery?: boolean, filterPolicy?: SubscriptionFilterPolicy): Subscription;

/**
* Adds a statement to the IAM resource policy associated with this topic.
Expand Down Expand Up @@ -96,12 +101,17 @@ export abstract class TopicBase extends Resource implements ITopic {
/**
* Subscribe some endpoint to this topic
*/
public subscribe(name: string, endpoint: string, protocol: SubscriptionProtocol, rawMessageDelivery?: boolean): Subscription {
public subscribe(name: string,
endpoint: string,
protocol: SubscriptionProtocol,
rawMessageDelivery?: boolean,
filterPolicy?: SubscriptionFilterPolicy): Subscription {
return new Subscription(this, name, {
topic: this,
endpoint,
protocol,
rawMessageDelivery,
filterPolicy,
});
}

Expand All @@ -114,7 +124,7 @@ export abstract class TopicBase extends Resource implements ITopic {
* @param queue The target queue
* @param rawMessageDelivery Enable raw message delivery
*/
public subscribeQueue(queue: sqs.IQueue, rawMessageDelivery?: boolean): Subscription {
public subscribeQueue(queue: sqs.IQueue, rawMessageDelivery?: boolean, filterPolicy?: SubscriptionFilterPolicy): Subscription {
if (!cdk.Construct.isConstruct(queue)) {
throw new Error(`The supplied Queue object must be an instance of Construct`);
}
Expand All @@ -132,6 +142,7 @@ export abstract class TopicBase extends Resource implements ITopic {
endpoint: queue.queueArn,
protocol: SubscriptionProtocol.Sqs,
rawMessageDelivery,
filterPolicy,
});

// add a statement to the queue resource policy which allows this topic
Expand All @@ -153,7 +164,7 @@ export abstract class TopicBase extends Resource implements ITopic {
*
* @param lambdaFunction The Lambda function to invoke
*/
public subscribeLambda(lambdaFunction: lambda.IFunction): Subscription {
public subscribeLambda(lambdaFunction: lambda.IFunction, filterPolicy?: SubscriptionFilterPolicy): Subscription {
if (!cdk.Construct.isConstruct(lambdaFunction)) {
throw new Error(`The supplied lambda Function object must be an instance of Construct`);
}
Expand All @@ -170,6 +181,7 @@ export abstract class TopicBase extends Resource implements ITopic {
topic: this,
endpoint: lambdaFunction.functionArn,
protocol: SubscriptionProtocol.Lambda,
filterPolicy,
});

lambdaFunction.addPermission(this.node.id, {
Expand All @@ -187,13 +199,17 @@ export abstract class TopicBase extends Resource implements ITopic {
* @param emailAddress The email address to use.
* @param options Options for the email delivery format.
*/
public subscribeEmail(name: string, emailAddress: string, options?: EmailSubscriptionOptions): Subscription {
public subscribeEmail(name: string,
emailAddress: string,
options?: EmailSubscriptionOptions,
filterPolicy?: SubscriptionFilterPolicy): Subscription {
const protocol = (options && options.json ? SubscriptionProtocol.EmailJson : SubscriptionProtocol.Email);

return new Subscription(this, name, {
topic: this,
endpoint: emailAddress,
protocol
protocol,
filterPolicy
});
}

Expand All @@ -204,7 +220,7 @@ export abstract class TopicBase extends Resource implements ITopic {
* @param url The URL to invoke
* @param rawMessageDelivery Enable raw message delivery
*/
public subscribeUrl(name: string, url: string, rawMessageDelivery?: boolean): Subscription {
public subscribeUrl(name: string, url: string, rawMessageDelivery?: boolean, filterPolicy?: SubscriptionFilterPolicy): Subscription {
if (!url.startsWith('http://') && !url.startsWith('https://')) {
throw new Error('URL must start with either http:// or https://');
}
Expand All @@ -216,6 +232,7 @@ export abstract class TopicBase extends Resource implements ITopic {
endpoint: url,
protocol,
rawMessageDelivery,
filterPolicy,
});
}

Expand Down
Loading