-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathkinesis-lambda.ts
117 lines (108 loc) · 5.29 KB
/
kinesis-lambda.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import { Stream, StreamEncryption } from "aws-cdk-lib/aws-kinesis";
import type { IStream, StreamProps } from "aws-cdk-lib/aws-kinesis";
import { StartingPosition } from "aws-cdk-lib/aws-lambda";
import { KinesisEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import type { KinesisEventSourceProps } from "aws-cdk-lib/aws-lambda-event-sources";
import type { GuLambdaErrorPercentageMonitoringProps, NoMonitoring } from "../../constructs/cloudwatch";
import { AppIdentity } from "../../constructs/core";
import type { GuStack } from "../../constructs/core";
import { GuKinesisStream } from "../../constructs/kinesis";
import { GuLambdaFunction } from "../../constructs/lambda";
import type { GuFunctionProps } from "../../constructs/lambda";
import { toAwsErrorHandlingProps } from "../../utils/lambda";
import type { StreamErrorHandlingProps, StreamProcessingProps } from "../../utils/lambda";
/**
* Used to provide information about an existing Kinesis stream to the [[`GuKinesisLambda`]] pattern.
*
* Specify an `externalKinesisStreamName` to link the lambda to a Kinesis stream owned by a different stack
* (or created outside of version control).
*/
export interface ExistingKinesisStream {
externalKinesisStreamName: string;
}
/**
* Configuration options for the [[`GuKinesisLambda`]] pattern.
*
* For all lambda function configuration options, see [[`GuFunctionProps`]].
*
* The `existingKinesisStream` property can be used to reference a Kinesis stream which
* has been created outside of this pattern (i.e. via CloudFormation, or via a different `cdk` pattern, or stack).
* For more details see [[`ExistingKinesisStream`]].
*
* If you have specific stream configuration requirements (e.g. data retention period), these can be set via
* `kinesisStreamProps`.
*
* If you need to override the default stream processing options (e.g. batch size and parallelization), pass
* [[`StreamProcessingProps`]] via `processingProps`.
*
* You must provide `errorHandlingConfiguration` to this pattern. Retry conditions can be configured
* via [[`StreamErrorHandlingProps`]].
*
* It is advisable to configure an alarm based on the lambda's error percentage.
* To do this, add the `monitoringConfiguration` property. The required properties for this are:
*
* ```typescript
* monitoringConfiguration: {
* toleratedErrorPercentage: <sensible_error_percentage_threshold>,
* snsTopicName: "my-topic-for-cloudwatch-alerts",
* }
* ```
* Other alarm properties (e.g. alarm name and description) will be pre-populated with sensible defaults.
* For a full list of optional properties, see [[`GuLambdaErrorPercentageMonitoringProps`]].
*
* If your team do not use CloudWatch, it's possible to opt-out with the following configuration:
* ```typescript
* monitoringConfiguration: { noMonitoring: true } as NoMonitoring
* ```
*/
export interface GuKinesisLambdaProps extends Omit<GuFunctionProps, "errorPercentageMonitoring"> {
monitoringConfiguration: NoMonitoring | GuLambdaErrorPercentageMonitoringProps;
existingKinesisStream?: ExistingKinesisStream;
errorHandlingConfiguration: StreamErrorHandlingProps;
kinesisStreamProps?: StreamProps;
processingProps?: StreamProcessingProps;
}
/**
* Pattern which creates all of the resources needed to invoke a lambda function whenever a record is
* put onto a Kinesis stream.
*
* This pattern will create a new Kinesis stream by default. If you are migrating a stack from CloudFormation,
* you will need to opt-out of this behaviour. For information on overriding the default behaviour,
* see [[`GuKinesisLambdaProps`]].
*
* The Kinesis stream is stateful, and is accessible via `kinesisStream`.
* @see https://github.com/guardian/cdk/blob/main/docs/stateful-resources.md
*
* @experimental This pattern is in early development. The API is likely to change in future releases.
*/
export class GuKinesisLambdaExperimental extends GuLambdaFunction {
public readonly kinesisStream: IStream;
constructor(scope: GuStack, id: string, props: GuKinesisLambdaProps) {
super(scope, id, {
...props,
errorPercentageMonitoring: props.monitoringConfiguration.noMonitoring ? undefined : props.monitoringConfiguration,
});
const { account, region } = scope;
const { existingKinesisStream, kinesisStreamProps } = props;
this.kinesisStream = existingKinesisStream
? Stream.fromStreamArn(
scope,
existingKinesisStream.externalKinesisStreamName,
`arn:aws:kinesis:${region}:${account}:stream/${existingKinesisStream.externalKinesisStreamName}`,
)
: AppIdentity.taggedConstruct(
props,
new GuKinesisStream(scope, "KinesisStream", { encryption: StreamEncryption.MANAGED, ...kinesisStreamProps }),
);
const errorHandlingPropsToAwsProps = toAwsErrorHandlingProps(props.errorHandlingConfiguration);
const eventSourceProps: KinesisEventSourceProps = {
startingPosition: StartingPosition.LATEST,
...props.processingProps,
...errorHandlingPropsToAwsProps,
};
// If we have an alias, use this to ensure that all events are sent to a published Lambda version.
// Otherwise, send all events to the latest unpublished version ($LATEST)
const eventSourceTarget = this.alias ?? this;
eventSourceTarget.addEventSource(new KinesisEventSource(this.kinesisStream, eventSourceProps));
}
}