Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(lambda-event-sources): refactoring the filters type #22096

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 69 additions & 29 deletions packages/@aws-cdk/aws-lambda-event-sources/test/dynamo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,80 @@ describe('DynamoEventSource', () => {
stream: dynamodb.StreamViewType.NEW_IMAGE,
});

const filters = [
{
eventName: lambda.FilterRule.isEqual('INSERT'),
dynamodb: {
Keys: {
id: {
S: lambda.FilterRule.exists(),
},
},
},
},
];

expect(Array.isArray(filters)).toBe(true);

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
filters: [
lambda.FilterCriteria.filter({
eventName: lambda.FilterRule.isEqual('INSERT'),
dynamodb: {
Keys: {
id: {
S: lambda.FilterRule.exists(),
},
filters: filters,
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'TD925BC7E',
'StreamArn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'FilterCriteria': {
'Filters': [
{
'Pattern': '{"eventName":["INSERT"],"dynamodb":{"Keys":{"id":{"S":[{"exists":true}]}}}}',
},
],
},
'StartingPosition': 'LATEST',
});
});

test('adding filters', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_IMAGE,
});

const filters = [
{
eventName: lambda.FilterRule.isEqual('INSERT'),
dynamodb: {
Keys: {
id: {
S: lambda.FilterRule.exists(),
},
},
}),
],
},
},
];

expect(Array.isArray(filters)).toBe(true);

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
filters: filters,
}));

// THEN
Expand Down Expand Up @@ -348,8 +407,6 @@ describe('DynamoEventSource', () => {
maxBatchingWindow: cdk.Duration.seconds(301),
startingPosition: lambda.StartingPosition.LATEST,
}))).toThrow(/maxBatchingWindow cannot be over 300 seconds/);


});

test('contains eventSourceMappingId after lambda binding', () => {
Expand All @@ -372,7 +429,6 @@ describe('DynamoEventSource', () => {

// THEN
expect(eventSource.eventSourceMappingId).toBeDefined();

});

test('eventSourceMappingId throws error before binding to lambda', () => {
Expand Down Expand Up @@ -426,8 +482,6 @@ describe('DynamoEventSource', () => {
'MaximumRetryAttempts': 10,
'StartingPosition': 'LATEST',
});


});

test('fails if retryAttempts < 0', () => {
Expand All @@ -448,8 +502,6 @@ describe('DynamoEventSource', () => {
retryAttempts: -1,
startingPosition: lambda.StartingPosition.LATEST,
}))).toThrow(/retryAttempts must be between 0 and 10000 inclusive, got -1/);


});

test('fails if retryAttempts > 10000', () => {
Expand Down Expand Up @@ -542,8 +594,6 @@ describe('DynamoEventSource', () => {
'ParallelizationFactor': 5,
'StartingPosition': 'LATEST',
});


});

test('fails if parallelizationFactor < 1', () => {
Expand All @@ -564,8 +614,6 @@ describe('DynamoEventSource', () => {
parallelizationFactor: 0,
startingPosition: lambda.StartingPosition.LATEST,
}))).toThrow(/parallelizationFactor must be between 1 and 10 inclusive, got 0/);


});

test('fails if parallelizationFactor > 10', () => {
Expand All @@ -586,8 +634,6 @@ describe('DynamoEventSource', () => {
parallelizationFactor: 11,
startingPosition: lambda.StartingPosition.LATEST,
}))).toThrow(/parallelizationFactor must be between 1 and 10 inclusive, got 11/);


});

test('specific maxRecordAge', () => {
Expand Down Expand Up @@ -622,8 +668,6 @@ describe('DynamoEventSource', () => {
'MaximumRecordAgeInSeconds': 100,
'StartingPosition': 'LATEST',
});


});

test('fails if maxRecordAge < 60 seconds', () => {
Expand Down Expand Up @@ -666,8 +710,6 @@ describe('DynamoEventSource', () => {
maxRecordAge: cdk.Duration.seconds(604801),
startingPosition: lambda.StartingPosition.LATEST,
}))).toThrow(/maxRecordAge must be between 60 seconds and 7 days inclusive/);


});

test('specific destinationConfig', () => {
Expand Down Expand Up @@ -713,8 +755,6 @@ describe('DynamoEventSource', () => {
},
'StartingPosition': 'LATEST',
});


});

test('specific functionResponseTypes', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn.addEventSource(new DynamoEventSource(table, {
batchSize: 5,
startingPosition: lambda.StartingPosition.LATEST,
filters: [
lambda.FilterCriteria.filter({
{
eventName: lambda.FilterRule.isEqual('INSERT'),
dynamodb: {
Keys: {
Expand All @@ -32,7 +32,7 @@ fn.addEventSource(new DynamoEventSource(table, {
},
},
},
}),
},
],
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ const queue = new sqs.Queue(stack, 'Q');
fn.addEventSource(new SqsEventSource(queue, {
batchSize: 5,
filters: [
lambda.FilterCriteria.filter({
{
body: {
id: lambda.FilterRule.exists(),
},
}),
},
],
}));

Expand Down
4 changes: 2 additions & 2 deletions packages/@aws-cdk/aws-lambda-event-sources/test/kafka.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ describe('KafkaEventSource', () => {


});

test('with secret', () => {
// GIVEN
const stack = new cdk.Stack();
Expand Down Expand Up @@ -205,6 +206,7 @@ describe('KafkaEventSource', () => {


});

test('without vpc, secret must be set', () => {
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
Expand All @@ -219,8 +221,6 @@ describe('KafkaEventSource', () => {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
}));
}).toThrow(/secret must be set/);


});

describe('vpc', () => {
Expand Down
4 changes: 2 additions & 2 deletions packages/@aws-cdk/aws-lambda-event-sources/test/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,11 @@ describe('SQSEventSource', () => {
// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
filters: [
lambda.FilterCriteria.filter({
{
body: {
id: lambda.FilterRule.exists(),
},
}),
},
],
}));

Expand Down
19 changes: 19 additions & 0 deletions packages/@aws-cdk/aws-lambda/lib/event-source-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,29 @@ export class FilterRule {
* Filter criteria for Lambda event filtering
*/
export class FilterCriteria {

/**
* Filter for event source
* @deprecated use `addFilter`
*/
public static filter(filter: {[key:string]: any}): {[key:string]: any} {
return { pattern: JSON.stringify(filter) };
}
/**
* Filter for event source
*/
public static addFilter(filter: {[key:string]: any}): FilterCriteria {
return new FilterCriteria(filter);
}

private constructor(private filterCriteria: {[key:string]: any}) {}

/**
* Returns a pattern to filter criteria
*
* @returns pattern object
*/
public toPattern(): {[key: string]: string} {
return { pattern: JSON.stringify(this.filterCriteria) };
}
}
12 changes: 10 additions & 2 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as cdk from '@aws-cdk/core';
import { Construct } from 'constructs';
import { IEventSourceDlq } from './dlq';
import { FilterCriteria } from './event-source-filter';
import { IFunction } from './function-base';
import { CfnEventSourceMapping } from './lambda.generated';

Expand Down Expand Up @@ -239,7 +240,7 @@ export interface EventSourceMappingOptions {
*
* @default - none
*/
readonly filters?: Array<{[key: string]: any}>
readonly filters?: Array<{[key: string]: any}>;
}

/**
Expand Down Expand Up @@ -356,6 +357,13 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp

let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined;

const filterCriteria: Array<{[key: string]: string}> = [];
if (props.filters) {
for (const filter of props.filters) {
filterCriteria.push(FilterCriteria.addFilter(filter).toPattern());
}
}

const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
bisectBatchOnFunctionError: props.bisectBatchOnError,
Expand All @@ -374,7 +382,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
selfManagedEventSource,
filterCriteria: props.filters ? { filters: props.filters }: undefined,
filterCriteria: props.filters ? { filters: filterCriteria }: undefined,
selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined,
amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined,
});
Expand Down
16 changes: 8 additions & 8 deletions packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Match, Template } from '@aws-cdk/assertions';
import * as cdk from '@aws-cdk/core';
import { Code, EventSourceMapping, Function, Runtime, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib';
import { Code, EventSourceMapping, Function, Runtime, Alias, StartingPosition, FilterRule } from '../lib';

let stack: cdk.Stack;
let fn: Function;
Expand Down Expand Up @@ -212,7 +212,7 @@ describe('event source mapping', () => {
});
});

test('filter with one pattern', () => {
test('filters with one pattern', () => {
const topicNameParam = new cdk.CfnParameter(stack, 'TopicNameParam', {
type: 'String',
});
Expand All @@ -224,9 +224,9 @@ describe('event source mapping', () => {
eventSourceArn: eventSourceArn,
kafkaTopic: topicNameParam.valueAsString,
filters: [
FilterCriteria.filter({
{
numericEquals: FilterRule.isEqual(1),
}),
},
],
});

Expand All @@ -253,13 +253,13 @@ describe('event source mapping', () => {
eventSourceArn: eventSourceArn,
kafkaTopic: topicNameParam.valueAsString,
filters: [
FilterCriteria.filter({
{
orFilter: FilterRule.or('one', 'two'),
stringEquals: FilterRule.isEqual('test'),
}),
FilterCriteria.filter({
},
{
numericEquals: FilterRule.isEqual(1),
}),
},
],
});

Expand Down