-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathevent-source.ts
80 lines (73 loc) · 2.66 KB
/
event-source.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import { Duration } from "aws-cdk-lib";
import type { IEventSourceDlq } from "aws-cdk-lib/aws-lambda";
import type { KinesisEventSourceProps } from "aws-cdk-lib/aws-lambda-event-sources";
export type StreamProcessingProps = Omit<
KinesisEventSourceProps,
"bisectBatchOnError" | "maxRecordAge" | "onFailure" | "retryAttempts"
>;
/**
* In order to prevent your lambda from continuously retrying if it encounters errors
* whilst processing a batch, use `retryBehaviour` to give up on a record once it reaches a certain age,
* or after a specified number of attempts. See [[`StreamRetry`]] for more details.
*
* In order to isolate bad records as part of the retry process, you may also want to bisect problematic batches
* using `bisectBatchOnError`. For example:
*
* ```typescript
* const errorHandlingProps: ErrorHandlingProps = {
* bisectBatchOnError: true,
* retryBehaviour: StreamRetry.maxAttempts(5),
* }
* ```
*
* Records which could not be processed successfully can (optionally) be sent to a dead letter queue
* via `deadLetterQueueForSkippedRecords`.
*/
export interface StreamErrorHandlingProps {
retryBehaviour: StreamRetry;
bisectBatchOnError: boolean;
deadLetterQueueForSkippedRecords?: IEventSourceDlq;
blockProcessingAndRetryIndefinitely?: false;
}
type AwsErrorHandlingProps = Pick<
KinesisEventSourceProps,
"bisectBatchOnError" | "maxRecordAge" | "onFailure" | "retryAttempts"
>;
export function toAwsErrorHandlingProps(errorHandlingProps: StreamErrorHandlingProps): AwsErrorHandlingProps {
return {
bisectBatchOnError: errorHandlingProps.bisectBatchOnError,
onFailure: errorHandlingProps.deadLetterQueueForSkippedRecords,
...errorHandlingProps.retryBehaviour.toAwsProp(),
};
}
type AwsRetryProp = Pick<AwsErrorHandlingProps, "maxRecordAge" | "retryAttempts">;
type RetryType = "attempts" | "recordAge";
/**
* To retry based on number of attempts, use:
* ```typescript
* StreamRetry.maxAttempts(5)
* ```
* To retry based on the age of a record, use:
* ```typescript
* StreamRetry.maxAge(Duration.minutes(5))
* ```
*/
export class StreamRetry {
public static maxAttempts(amount: number): StreamRetry {
return new StreamRetry(amount, "attempts");
}
public static maxAge(duration: Duration): StreamRetry {
return new StreamRetry(duration.toSeconds(), "recordAge");
}
public toAwsProp(): AwsRetryProp {
return this.retryType === "attempts"
? { retryAttempts: this.amount }
: { maxRecordAge: Duration.seconds(this.amount) };
}
private readonly amount: number;
readonly retryType: RetryType;
private constructor(amount: number, type: RetryType) {
this.amount = amount;
this.retryType = type;
}
}