-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
19 changed files
with
34,409 additions
and
44 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
158 changes: 158 additions & 0 deletions
158
packages/@aws-cdk/aws-pipes-sources-alpha/lib/dynamodb.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import { IPipe, ISource, SourceConfig } from '@aws-cdk/aws-pipes-alpha'; | ||
import { Duration } from 'aws-cdk-lib'; | ||
import { IRole } from 'aws-cdk-lib/aws-iam'; | ||
import { ITableV2 } from 'aws-cdk-lib/aws-dynamodb'; | ||
import { DeadLetterConfigParameters } from './deadLetterConfig'; | ||
import { DynamoDBStartingPosition, OnPartialBatchItemFailure } from './enums'; | ||
|
||
/** | ||
* Parameters for the DynamoDB source. | ||
*/ | ||
export interface DynamoDBSourceParameters { | ||
/** | ||
* The maximum number of records to include in each batch. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-batchsize | ||
* @default 1 | ||
*/ | ||
readonly batchSize?: number; | ||
|
||
/** | ||
* Define the target queue to send dead-letter queue events to. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-deadletterconfig | ||
* @default no dead letter queue | ||
*/ | ||
readonly deadLetterConfig?: DeadLetterConfigParameters; | ||
|
||
/** | ||
* The maximum length of a time to wait for events. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumbatchingwindowinseconds | ||
* @default no batching window | ||
*/ | ||
readonly maximumBatchingWindow?: Duration; | ||
|
||
/** | ||
* (Streams only) Discard records older than the specified age. The default value is -1, which sets the maximum age to infinite. When the value is set to infinite, EventBridge never discards old records. | ||
* | ||
* Leave undefined to set the maximum record age to infinite. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumrecordageinseconds | ||
* @default -1 (infinite) | ||
*/ | ||
readonly maximumRecordAge?: Duration; | ||
|
||
/** | ||
* (Streams only) Discard records after the specified number of retries. The default value is -1, which sets the maximum number of retries to infinite. When MaximumRetryAttempts is infinite, EventBridge retries failed records until the record expires in the event source. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumretryattempts | ||
* @default -1 (infinite) | ||
*/ | ||
readonly maximumRetryAttempts?: number; | ||
|
||
/** | ||
* (Streams only) Define how to handle item process failures. AUTOMATIC_BISECT halves each batch and retry each half until all the records are processed or there is one failed message left in the batch. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-onpartialbatchitemfailure | ||
* @default off | ||
*/ | ||
readonly onPartialBatchItemFailure?: OnPartialBatchItemFailure; | ||
|
||
/** | ||
* (Streams only) The number of batches to process concurrently from each shard. The default value is 1. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-parallelizationfactor | ||
* @default 1 | ||
*/ | ||
readonly parallelizationFactor?: number; | ||
|
||
/** | ||
* (Streams only) The position in a stream from which to start reading. | ||
* | ||
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-startingposition | ||
*/ | ||
readonly startingPosition: DynamoDBStartingPosition; | ||
} | ||
|
||
/** | ||
* A source that reads from an DynamoDB stream. | ||
*/ | ||
export class DynamoDBSource implements ISource { | ||
private readonly table: ITableV2; | ||
readonly sourceArn; | ||
private sourceParameters; | ||
|
||
private batchSize; | ||
private maximumBatchingWindowInSeconds; | ||
private maximumRecordAgeInSeconds; | ||
private maximumRetryAttempts; | ||
private parallelizationFactor; | ||
|
||
constructor(table: ITableV2, parameters: DynamoDBSourceParameters) { | ||
this.table = table; | ||
|
||
if (table.tableStreamArn === undefined) { | ||
throw new Error('Table does not have a stream defined, cannot create pipes source'); | ||
} | ||
|
||
this.sourceArn = table.tableStreamArn; | ||
this.sourceParameters = parameters; | ||
|
||
this.batchSize = this.sourceParameters.batchSize; | ||
this.maximumBatchingWindowInSeconds = this.sourceParameters.maximumBatchingWindow?.toSeconds(); | ||
this.maximumRecordAgeInSeconds = this.sourceParameters.maximumRecordAge?.toSeconds(); | ||
this.maximumRetryAttempts = this.sourceParameters.maximumRetryAttempts; | ||
this.parallelizationFactor = this.sourceParameters.parallelizationFactor; | ||
|
||
if (this.batchSize !== undefined) { | ||
if (this.batchSize < 1 || this.batchSize > 10000) { | ||
throw new Error(`Batch size must be between 1 and 10000, received ${this.batchSize}`); | ||
} | ||
} | ||
if (this.maximumBatchingWindowInSeconds !== undefined) { | ||
// only need to check upper bound since Duration amounts cannot be negative | ||
if (this.maximumBatchingWindowInSeconds > 300) { | ||
throw new Error(`Maximum batching window must be between 0 and 300, received ${this.maximumBatchingWindowInSeconds}`); | ||
} | ||
} | ||
if (this.maximumRecordAgeInSeconds !== undefined) { | ||
// only need to check upper bound since Duration amounts cannot be negative | ||
if (this.maximumRecordAgeInSeconds > 604800) { | ||
throw new Error(`Maximum record age in seconds must be between -1 and 604800, received ${this.maximumRecordAgeInSeconds}`); | ||
} | ||
} | ||
if (this.maximumRetryAttempts !== undefined) { | ||
if (this.maximumRetryAttempts < -1 || this.maximumRetryAttempts > 10000) { | ||
throw new Error(`Maximum retry attempts must be between -1 and 10000, received ${this.maximumRetryAttempts}`); | ||
} | ||
} | ||
if (this.parallelizationFactor !== undefined) { | ||
if (this.parallelizationFactor < 1 || this.parallelizationFactor > 10) { | ||
throw new Error(`Parallelization factor must be between 1 and 10, received ${this.parallelizationFactor}`); | ||
} | ||
} | ||
} | ||
|
||
bind(_pipe: IPipe): SourceConfig { | ||
return { | ||
sourceParameters: { | ||
dynamoDbStreamParameters: { | ||
batchSize: this.batchSize, | ||
deadLetterConfig: this.sourceParameters.deadLetterConfig, | ||
maximumBatchingWindowInSeconds: this.maximumBatchingWindowInSeconds, | ||
maximumRecordAgeInSeconds: this.maximumRecordAgeInSeconds, | ||
maximumRetryAttempts: this.maximumRetryAttempts, | ||
onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure, | ||
parallelizationFactor: this.sourceParameters.parallelizationFactor, | ||
startingPosition: this.sourceParameters.startingPosition, | ||
}, | ||
}, | ||
}; | ||
} | ||
|
||
grantRead(grantee: IRole): void { | ||
this.table.grantStreamRead(grantee); | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/** | ||
* Define how to handle item process failures. | ||
*/ | ||
export enum OnPartialBatchItemFailure { | ||
/** | ||
* AUTOMATIC_BISECT | ||
*/ | ||
AUTOMATIC_BISECT = 'AUTOMATIC_BISECT', | ||
} | ||
|
||
/** | ||
* The position in a Kinesis stream from which to start reading. | ||
*/ | ||
export enum KinesisStartingPosition { | ||
/** | ||
* TRIM_HORIZON | ||
*/ | ||
TRIM_HORIZON = 'TRIM_HORIZON', | ||
/** | ||
* LATEST | ||
*/ | ||
LATEST = 'LATEST', | ||
/** | ||
* AT_TIMESTAMP | ||
*/ | ||
AT_TIMESTAMP = 'AT_TIMESTAMP', | ||
} | ||
|
||
/** | ||
* The position in a DynamoDB stream from which to start reading. | ||
*/ | ||
export enum DynamoDBStartingPosition { | ||
/** | ||
* TRIM_HORIZON | ||
*/ | ||
TRIM_HORIZON = 'TRIM_HORIZON', | ||
/** | ||
* LATEST | ||
*/ | ||
LATEST = 'LATEST', | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
export * from './sqs'; | ||
export * from './kinesis'; | ||
export * from './dynamodb'; | ||
export * from './enums'; | ||
export * from './deadLetterConfig'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
packages/@aws-cdk/aws-pipes-sources-alpha/test/__snapshots__/dynamodb.test.ts.snap
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Jest Snapshot v1, https://goo.gl/fbAQLP | ||
|
||
exports[`dynamodb source should grant pipe role read access 1`] = ` | ||
{ | ||
"MyPipeRoleCBC8E9AB": { | ||
"Properties": { | ||
"AssumeRolePolicyDocument": { | ||
"Statement": [ | ||
{ | ||
"Action": "sts:AssumeRole", | ||
"Effect": "Allow", | ||
"Principal": { | ||
"Service": "pipes.amazonaws.com", | ||
}, | ||
}, | ||
], | ||
"Version": "2012-10-17", | ||
}, | ||
}, | ||
"Type": "AWS::IAM::Role", | ||
}, | ||
} | ||
`; | ||
|
||
exports[`dynamodb source should grant pipe role read access 2`] = ` | ||
{ | ||
"MyPipeRoleDefaultPolicy31387C20": { | ||
"Properties": { | ||
"PolicyDocument": { | ||
"Statement": [ | ||
{ | ||
"Action": "dynamodb:ListStreams", | ||
"Effect": "Allow", | ||
"Resource": { | ||
"Fn::GetAtt": [ | ||
"MyTable794EDED1", | ||
"StreamArn", | ||
], | ||
}, | ||
}, | ||
{ | ||
"Action": [ | ||
"dynamodb:DescribeStream", | ||
"dynamodb:GetRecords", | ||
"dynamodb:GetShardIterator", | ||
], | ||
"Effect": "Allow", | ||
"Resource": { | ||
"Fn::GetAtt": [ | ||
"MyTable794EDED1", | ||
"StreamArn", | ||
], | ||
}, | ||
}, | ||
], | ||
"Version": "2012-10-17", | ||
}, | ||
"PolicyName": "MyPipeRoleDefaultPolicy31387C20", | ||
"Roles": [ | ||
{ | ||
"Ref": "MyPipeRoleCBC8E9AB", | ||
}, | ||
], | ||
}, | ||
"Type": "AWS::IAM::Policy", | ||
}, | ||
} | ||
`; |
Oops, something went wrong.