Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stepfunctions): distributed map construct #28821

Merged
merged 14 commits into from
Feb 9, 2024
Next Next commit
feat(stepfunctions): add distributed map construct
abdelnn committed Jan 23, 2024
commit 9592e28d4299e42fd6f401d63134de6d86fd15a2

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
{
"Resources": {
"Bucket83908E77": {
"Type": "AWS::S3::Bucket",
"Properties": {
"Tags": [
{
"Key": "aws-cdk:auto-delete-objects",
"Value": "true"
}
]
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"BucketPolicyE9A3008A": {
"Type": "AWS::S3::BucketPolicy",
"Properties": {
"Bucket": {
"Ref": "Bucket83908E77"
},
"PolicyDocument": {
"Statement": [
{
"Action": [
"s3:DeleteObject*",
"s3:GetBucket*",
"s3:List*",
"s3:PutBucketPolicy"
],
"Effect": "Allow",
"Principal": {
"AWS": {
"Fn::GetAtt": [
"CustomS3AutoDeleteObjectsCustomResourceProviderRole3B1BD092",
"Arn"
]
}
},
"Resource": [
{
"Fn::GetAtt": [
"Bucket83908E77",
"Arn"
]
},
{
"Fn::Join": [
"",
[
{
"Fn::GetAtt": [
"Bucket83908E77",
"Arn"
]
},
"/*"
]
]
}
]
}
],
"Version": "2012-10-17"
}
}
},
"BucketAutoDeleteObjectsCustomResourceBAFD23C2": {
"Type": "Custom::S3AutoDeleteObjects",
"Properties": {
"ServiceToken": {
"Fn::GetAtt": [
"CustomS3AutoDeleteObjectsCustomResourceProviderHandler9D90184F",
"Arn"
]
},
"BucketName": {
"Ref": "Bucket83908E77"
}
},
"DependsOn": [
"BucketPolicyE9A3008A"
],
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"CustomS3AutoDeleteObjectsCustomResourceProviderRole3B1BD092": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
]
},
"ManagedPolicyArns": [
{
"Fn::Sub": "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
]
}
},
"CustomS3AutoDeleteObjectsCustomResourceProviderHandler9D90184F": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"S3Bucket": {
"Fn::Sub": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}"
},
"S3Key": "2ec8ad9e91dcd6e7ad6a5c84ffc6c9c05c408aca3b26ceb2816d81043e6c4dc3.zip"
},
"Timeout": 900,
"MemorySize": 128,
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"CustomS3AutoDeleteObjectsCustomResourceProviderRole3B1BD092",
"Arn"
]
},
"Runtime": "nodejs18.x",
"Description": {
"Fn::Join": [
"",
[
"Lambda function for auto-deleting objects in ",
{
"Ref": "Bucket83908E77"
},
" S3 bucket."
]
]
}
},
"DependsOn": [
"CustomS3AutoDeleteObjectsCustomResourceProviderRole3B1BD092"
]
},
"StateMachineRoleB840431D": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "states.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"StateMachineRoleDefaultPolicyDF1E6607": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"s3:AbortMultipartUpload",
"s3:GetObject",
"s3:ListMultipartUploadParts",
"s3:PutObject"
],
"Effect": "Allow",
"Resource": {
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":s3:::",
{
"Ref": "Bucket83908E77"
},
"/*"
]
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "StateMachineRoleDefaultPolicyDF1E6607",
"Roles": [
{
"Ref": "StateMachineRoleB840431D"
}
]
}
},
"StateMachine2E01A3A5": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"DefinitionString": {
"Fn::Join": [
"",
[
"{\"StartAt\":\"DistributedMap\",\"States\":{\"DistributedMap\":{\"Type\":\"Map\",\"End\":true,\"ItemProcessor\":{\"ProcessorConfig\":{\"Mode\":\"DISTRIBUTED\",\"ExecutionType\":\"STANDARD\"},\"StartAt\":\"Pass\",\"States\":{\"Pass\":{\"Type\":\"Pass\",\"End\":true}}},\"ItemReader\":{\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::s3:getObject\",\"ReaderConfig\":{\"InputType\":\"CSV\",\"CSVHeaderLocation\":\"FIRST_ROW\"},\"Parameters\":{\"Bucket\":\"",
{
"Ref": "Bucket83908E77"
},
"\",\"Key\":\"my-key.csv\"}},\"ResultWriter\":{\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::s3:putObject\",\"Parameters\":{\"Bucket\":\"",
{
"Ref": "Bucket83908E77"
},
"\",\"Prefix\":\"my-prefix\"}}}}}"
]
]
},
"RoleArn": {
"Fn::GetAtt": [
"StateMachineRoleB840431D",
"Arn"
]
}
},
"DependsOn": [
"StateMachineRoleDefaultPolicyDF1E6607",
"StateMachineRoleB840431D"
],
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"StateMachineDistributedMapPolicy57C9D8C2": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "states:StartExecution",
"Effect": "Allow",
"Resource": {
"Ref": "StateMachine2E01A3A5"
}
},
{
"Action": [
"states:DescribeExecution",
"states:StopExecution"
],
"Effect": "Allow",
"Resource": {
"Fn::Join": [
"",
[
{
"Ref": "StateMachine2E01A3A5"
},
":*"
]
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "StateMachineDistributedMapPolicy57C9D8C2",
"Roles": [
{
"Ref": "StateMachineRoleB840431D"
}
]
}
}
},
"Outputs": {
"ExportsOutputRefStateMachine2E01A3A5BA46F753": {
"Value": {
"Ref": "StateMachine2E01A3A5"
},
"Export": {
"Name": "aws-stepfunctions-map-integ:ExportsOutputRefStateMachine2E01A3A5BA46F753"
}
},
"ExportsOutputRefBucket83908E7781C90AC0": {
"Value": {
"Ref": "Bucket83908E77"
},
"Export": {
"Name": "aws-stepfunctions-map-integ:ExportsOutputRefBucket83908E7781C90AC0"
}
}
},
"Parameters": {
"BootstrapVersion": {
"Type": "AWS::SSM::Parameter::Value<String>",
"Default": "/cdk-bootstrap/hnb659fds/version",
"Description": "Version of the CDK Bootstrap resources in this environment, automatically retrieved from SSM Parameter Store. [cdk:skip]"
}
},
"Rules": {
"CheckBootstrapVersion": {
"Assertions": [
{
"Assert": {
"Fn::Not": [
{
"Fn::Contains": [
[
"1",
"2",
"3",
"4",
"5"
],
{
"Ref": "BootstrapVersion"
}
]
}
]
},
"AssertDescription": "CDK bootstrap stack version 6 required. Please run 'cdk bootstrap' with a recent version of the CDK CLI."
}
]
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as cdk from 'aws-cdk-lib/core';
import { ExpectedResult, IntegTest } from '@aws-cdk/integ-tests-alpha';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';

const CSV_KEY = 'my-key.csv';

class DistributedMapStack extends cdk.Stack {
readonly bucket: s3.Bucket;
readonly stateMachine: sfn.StateMachine;

constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);

this.bucket = new s3.Bucket(this, 'Bucket', {
autoDeleteObjects: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});

const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemReader: new sfn.S3CsvItemReader({
bucket: this.bucket,
key: CSV_KEY,
csvHeaders: sfn.CsvHeaders.useFirstRow(),
}),
resultWriter: new sfn.ResultWriter({
bucket: this.bucket,
prefix: 'my-prefix',
}),
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));

this.stateMachine = new sfn.StateMachine(this, 'StateMachine', {
definition: distributedMap,
});

}
}

const app = new cdk.App();
const stack = new DistributedMapStack(app, 'aws-stepfunctions-map-integ');

const testCase = new IntegTest(app, 'DistributedMap', {
testCases: [stack],
});

testCase.assertions
.awsApiCall('StepFunctions', 'describeStateMachine', {
stateMachineArn: stack.stateMachine.stateMachineArn,
})
.expect(ExpectedResult.objectLike({ status: 'ACTIVE' }));

// Put an object in the bucket
const putObject = testCase.assertions.awsApiCall('S3', 'putObject', {
Bucket: stack.bucket.bucketName,
Key: CSV_KEY,
Body: 'a,b,c\n1,2,3\n4,5,6',
});

// Start an execution
const start = testCase.assertions.awsApiCall('StepFunctions', 'startExecution', {
stateMachineArn: stack.stateMachine.stateMachineArn,
});
putObject.next(start);

// describe the results of the execution
const describe = testCase.assertions.awsApiCall('StepFunctions', 'describeExecution', {
executionArn: start.getAttString('executionArn'),
});
start.next(describe);

// assert the results
describe.expect(ExpectedResult.objectLike({
status: 'SUCCEEDED',
}));

app.synth();
48 changes: 48 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
@@ -244,6 +244,7 @@ are supported:
* [`Succeed`](#succeed)
* [`Fail`](#fail)
* [`Map`](#map)
* [`Distributed Map`](#distributed-map)
* [`Custom State`](#custom-state)

An arbitrary JSON object (specified at execution start) is passed from state to
@@ -542,6 +543,53 @@ map.itemProcessor(new sfn.Pass(this, 'Pass State'), {

> Visit [Using Map state in Distributed mode to orchestrate large-scale parallel workloads](https://docs.aws.amazon.com/step-functions/latest/dg/use-dist-map-orchestrate-large-scale-parallel-workloads.html) for more details.
### Distributed Map

Step Functions provides a high-concurrency mode for the Map state known as Distributed mode. In this mode, the Map state can accept input from large-scale Amazon S3 data sources. For example, your input can be a JSON or CSV file stored in an Amazon S3 bucket, or a JSON array passed from a previous step in the workflow. A Map state set to Distributed is known as a Distributed Map state. In this mode, the Map state runs each iteration as a child workflow execution, which enables high concurrency of up to 10,000 parallel child workflow executions. Each child workflow execution has its own, separate execution history from that of the parent workflow.

Use the Map state in Distributed mode when you need to orchestrate large-scale parallel workloads that meet any combination of the following conditions:

* The size of your dataset exceeds 256 KB.
* The workflow's execution event history exceeds 25,000 entries.
* You need a concurrency of more than 40 parallel iterations.

A `DistributedMap` state can be used to run a set of steps for each element of an input array with high concurrency.
A `DistributedMap` state will execute the same steps for multiple entries of an array in the state input or from S3 objects.

```ts
const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', {
maxConcurrency: 1,
itemsPath: sfn.JsonPath.stringAt('$.inputForMap'),
});
map.itemProcessor(new sfn.Pass(this, 'Pass State'));
```

Map states in Distributed mode support multiple sources for an array to iterate:

* JSON array from the state input payload
* objects in an S3 bucket and optional prefix
* JSON array in a JSON file stored in S3
* CSV file stored in S3
* S3 inventory manifest stored in S3

There are multiple classes that implement `IItemReader` that can be used to configure the iterator source. These can be provided via the optional `itemReader` property. The default behavior if `itemReader` is omitted is to use the input payload.

Map states in Distributed mode also support writing results of the iterator to an S3 bucket and optional prefix. Use a `ResultWriter` object provided via the optional `resultWriter` property to configure which S3 location iterator results will be written. The default behavior id `resultWriter` is omitted is to use the state output payload. However, if the iterator results are larger than the 256 kb limit for Step Functions payloads then the State Machine will fail.

```ts
const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', {
itemReader: new sfn.S3JsonItemReader({
bucket: 'my-bucket',
key: 'my-key.json',
}),
resultWriter: new sfn.ResultWriter({
bucket: 'my-bucket',
prefix: 'my-prefix',
})
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State'));
```

### Custom State

It's possible that the high-level constructs for the states or `stepfunctions-tasks` do not have
2 changes: 2 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/index.ts
Original file line number Diff line number Diff line change
@@ -19,8 +19,10 @@ export * from './states/succeed';
export * from './states/task';
export * from './states/wait';
export * from './states/map';
export * from './states/distributed-map';
export * from './states/custom-state';

export * from './states/map-base';
export * from './states/task-base';
export * from './task-credentials';

29 changes: 29 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/state-graph.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { StateMachine } from './state-machine';
import { DistributedMap } from './states/distributed-map';
import { State } from './states/state';
import * as iam from '../../aws-iam';
import { Duration } from '../../core';
@@ -159,4 +161,31 @@ export class StateGraph {
}
}

/**
* Binds this StateGraph to the StateMachine it defines and updates state machine permissions
*/
public bind(stateMachine: StateMachine) {

for (const state of this.allStates) {
if (DistributedMap.isDistributedMap(state)) {

stateMachine.role.attachInlinePolicy(new iam.Policy(stateMachine, 'DistributedMapPolicy', {
document: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['states:StartExecution'],
resources: [stateMachine.stateMachineArn],
}),
new iam.PolicyStatement({
actions: ['states:DescribeExecution', 'states:StopExecution'],
resources: [`${stateMachine.stateMachineArn}:*`],
}),
],
}),
}));

break;
}
}
}
}
31 changes: 20 additions & 11 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/state-machine.ts
Original file line number Diff line number Diff line change
@@ -444,13 +444,22 @@ export class StateMachine extends StateMachineBase {

this.stateMachineType = props.stateMachineType ?? StateMachineType.STANDARD;

let graph;
if (definitionBody instanceof ChainDefinitionBody) {
graph = new StateGraph(definitionBody.chainable.startState, 'State Machine definition');
graph.timeout = props.timeout;
for (const statement of graph.policyStatements) {
this.addToRolePolicy(statement);
Copy link
Contributor

@orekav orekav Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abdelnn is it possible that this is adding the role policies on the wrong place?

#29203

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as I remove this for and put it back where it was before (Only the for).
The bug disappears.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you're right, this should be

this.role.addToPrincipalPolicy(statement)

}
}

const resource = new CfnStateMachine(this, 'Resource', {
stateMachineName: this.physicalName,
stateMachineType: props.stateMachineType ?? undefined,
roleArn: this.role.roleArn,
loggingConfiguration: props.logs ? this.buildLoggingConfiguration(props.logs) : undefined,
tracingConfiguration: props.tracingEnabled ? this.buildTracingConfiguration() : undefined,
...definitionBody.bind(this, this.role, props),
...definitionBody.bind(this, props, graph),
definitionSubstitutions: props.definitionSubstitutions,
});
resource.applyRemovalPolicy(props.removalPolicy, { default: RemovalPolicy.DESTROY });
@@ -464,6 +473,11 @@ export class StateMachine extends StateMachineBase {
resourceName: this.physicalName,
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
});

if (definitionBody instanceof ChainDefinitionBody) {
graph!.bind(this);
}

this.stateMachineRevisionId = resource.attrStateMachineRevisionId;
}

@@ -673,15 +687,15 @@ export abstract class DefinitionBody {
return new ChainDefinitionBody(chainable);
}

public abstract bind(scope: Construct, sfnPrincipal: iam.IPrincipal, sfnProps: StateMachineProps): DefinitionConfig;
public abstract bind(scope: Construct, sfnProps: StateMachineProps, graph?: StateGraph): DefinitionConfig;
}

export class FileDefinitionBody extends DefinitionBody {
constructor(public readonly path: string, private readonly options: s3_assets.AssetOptions = {}) {
super();
}

public bind(scope: Construct, _sfnPrincipal: iam.IPrincipal, _sfnProps: StateMachineProps): DefinitionConfig {
public bind(scope: Construct, _sfnProps: StateMachineProps, _graph?: StateGraph): DefinitionConfig {
const asset = new s3_assets.Asset(scope, 'DefinitionBody', {
path: this.path,
...this.options,
@@ -700,7 +714,7 @@ export class StringDefinitionBody extends DefinitionBody {
super();
}

public bind(_scope: Construct, _sfnPrincipal: iam.IPrincipal, _sfnProps: StateMachineProps): DefinitionConfig {
public bind(_scope: Construct, _sfnProps: StateMachineProps, _graph?: StateGraph): DefinitionConfig {
return {
definitionString: this.body,
};
@@ -712,13 +726,8 @@ export class ChainDefinitionBody extends DefinitionBody {
super();
}

public bind(scope: Construct, sfnPrincipal: iam.IPrincipal, sfnProps: StateMachineProps): DefinitionConfig {
const graph = new StateGraph(this.chainable.startState, 'State Machine definition');
graph.timeout = sfnProps.timeout;
for (const statement of graph.policyStatements) {
sfnPrincipal.addToPrincipalPolicy(statement);
}
const graphJson = graph.toGraphJson();
public bind(scope: Construct, sfnProps: StateMachineProps, graph?: StateGraph): DefinitionConfig {
const graphJson = graph!.toGraphJson();
return {
definitionString: Stack.of(scope).toJsonString({ ...graphJson, Comment: sfnProps.comment }),
};
716 changes: 716 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/states/distributed-map.ts

Large diffs are not rendered by default.

231 changes: 231 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/states/map-base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import { Construct } from 'constructs';
import { StateType } from './private/state-type';
import { renderJsonPath, State } from './state';
import { Token } from '../../../core';
import { Chain } from '../chain';
import { FieldUtils } from '../fields';
import { StateGraph } from '../state-graph';
import { CatchProps, IChainable, INextable, ProcessorConfig, ProcessorMode, RetryProps } from '../types';

/**
* Properties for defining a Map state
*/
export interface BaseMapProps {
/**
* Optional name for this state
*
* @default - The construct ID will be used as state name
*/
readonly stateName?: string;

/**
* An optional description for this state
*
* @default No comment
*/
readonly comment?: string;

/**
* JSONPath expression to select part of the state to be the input to this state.
*
* May also be the special value JsonPath.DISCARD, which will cause the effective
* input to be the empty object {}.
*
* @default $
*/
readonly inputPath?: string;

/**
* JSONPath expression to select part of the state to be the output to this state.
*
* May also be the special value JsonPath.DISCARD, which will cause the effective
* output to be the empty object {}.
*
* @default $
*/
readonly outputPath?: string;

/**
* JSONPath expression to indicate where to inject the state's output
*
* May also be the special value JsonPath.DISCARD, which will cause the state's
* input to become its output.
*
* @default $
*/
readonly resultPath?: string;

/**
* JSONPath expression to select the array to iterate over
*
* @default $
*/
readonly itemsPath?: string;

/**
* The JSON that you want to override your default iteration input (mutually exclusive with `parameters`).
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-itemselector.html
*
* @default $
*/
readonly itemSelector?: { [key: string]: any };

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };

/**
* MaxConcurrency
*
* An upper bound on the number of iterations you want running at once.
*
* @default - full concurrency
*/
readonly maxConcurrency?: number;
}

/**
* Returns true if the value passed is a positive integer
* @param value the value ti validate
abdelnn marked this conversation as resolved.
Show resolved Hide resolved
*/

export const isPositiveInteger = (value: number) => {
const isFloat = Math.floor(value) !== value;

const isNotPositiveInteger = value < 0 || value > Number.MAX_SAFE_INTEGER;

return !isFloat && !isNotPositiveInteger;
};

/**
* Define a Map state in the state machine
*
* A `Map` state can be used to run a set of steps for each element of an input array.
* A Map state will execute the same steps for multiple entries of an array in the state input.
*
* While the Parallel state executes multiple branches of steps using the same input, a Map state
* will execute the same steps for multiple entries of an array in the state input.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html
*/
export abstract class MapBase extends State implements INextable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the motivation behind moving MapBase out of Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this PR, I wanted to do two things:

  • Implement a new Distributed Maps construct
  • Ensure that this new construct does not support deprecated ASL fields.

The way I decided to go about this was to abstract the common properties of both distributed and inline maps, and only support deprecated ASL fields (parameters and iterator) in the existing Map construct for backwards compatibility concerns.

Two other alternatives that I briefly considered but ultimately rejected:

  • Adding to the existing Map construct the required Distributed Map fields. This was rejected as having fields be conditionally available based on what kind of map it is seemed unnecessarily convoluted
  • Extending the existing Map construct to create the Distributed Map one. This was rejected as the new construct would contain deprecated ASL fields

public readonly endStates: INextable[];

private readonly maxConcurrency: number | undefined;
abdelnn marked this conversation as resolved.
Show resolved Hide resolved
protected readonly itemsPath?: string;
protected readonly itemSelector?: { [key: string]: any };

constructor(scope: Construct, id: string, props: BaseMapProps = {}) {
abdelnn marked this conversation as resolved.
Show resolved Hide resolved
super(scope, id, props);
this.endStates = [this];
this.maxConcurrency = props.maxConcurrency;
this.itemsPath = props.itemsPath;
this.itemSelector = props.itemSelector;
}

/**
* Add retry configuration for this state
*
* This controls if and how the execution will be retried if a particular
* error occurs.
*/
public addRetry(props: RetryProps = {}): MapBase {
super._addRetry(props);
return this;
}

/**
* Add a recovery handler for this state
*
* When a particular error occurs, execution will continue at the error
* handler instead of failing the state machine execution.
*/
public addCatch(handler: IChainable, props: CatchProps = {}): MapBase {
super._addCatch(handler.startState, props);
return this;
}

/**
* Continue normal execution with the given state
*/
public next(next: IChainable): Chain {
super.makeNext(next.startState);
return Chain.sequence(this, next);
}

/**
* Define item processor in Map.
*
* A Map must either have a non-empty iterator or a non-empty item processor (mutually exclusive with `iterator`).
*/
public itemProcessor(processor: IChainable, config: ProcessorConfig = {}): MapBase {
const name = `Map ${this.stateId} Item Processor`;
const stateGraph = new StateGraph(processor.startState, name);
super.addItemProcessor(stateGraph, config);
return this;
}

/**
* Return the Amazon States Language object for this state
*/
public toStateJson(): object {
return {
Type: StateType.MAP,
Comment: this.comment,
ResultPath: renderJsonPath(this.resultPath),
...this.renderNextEnd(),
...this.renderInputOutput(),
...this.renderResultSelector(),
...this.renderRetryCatch(),
...this.renderItemsPath(),
...this.renderItemSelector(),
...this.renderItemProcessor(),
MaxConcurrency: this.maxConcurrency,
};
}

/**
* Validate this state
*/
protected validateState(): string[] {
const errors: string[] = [];

if (this.processorConfig?.mode === ProcessorMode.DISTRIBUTED && !this.processorConfig?.executionType) {
errors.push('You must specify an execution type for the distributed Map workflow');
}

if (this.maxConcurrency && !Token.isUnresolved(this.maxConcurrency) && !isPositiveInteger(this.maxConcurrency)) {
errors.push('maxConcurrency has to be a positive integer');
}

return errors;
}

private renderItemsPath(): any {
return {
ItemsPath: renderJsonPath(this.itemsPath),
};
}

/**
* Render ItemSelector in ASL JSON format
*/
private renderItemSelector(): any {
if (!this.itemSelector) return undefined;
return FieldUtils.renderObject({
ItemSelector: this.itemSelector,
});
}
}
201 changes: 7 additions & 194 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/states/map.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,13 @@
import { Construct } from 'constructs';
import { StateType } from './private/state-type';
import { renderJsonPath, State } from './state';
import { Token } from '../../../core';
import { Chain } from '../chain';
import { MapBase, BaseMapProps } from './map-base';
import { FieldUtils } from '../fields';
import { StateGraph } from '../state-graph';
import { CatchProps, IChainable, INextable, ProcessorConfig, ProcessorMode, RetryProps } from '../types';
import { IChainable, INextable, ProcessorMode } from '../types';

/**
* Properties for defining a Map state
*/
export interface MapProps {
/**
* Optional name for this state
*
* @default - The construct ID will be used as state name
*/
readonly stateName?: string;

/**
* An optional description for this state
*
* @default No comment
*/
readonly comment?: string;

/**
* JSONPath expression to select part of the state to be the input to this state.
*
* May also be the special value JsonPath.DISCARD, which will cause the effective
* input to be the empty object {}.
*
* @default $
*/
readonly inputPath?: string;

/**
* JSONPath expression to select part of the state to be the output to this state.
*
* May also be the special value JsonPath.DISCARD, which will cause the effective
* output to be the empty object {}.
*
* @default $
*/
readonly outputPath?: string;

/**
* JSONPath expression to indicate where to inject the state's output
*
* May also be the special value JsonPath.DISCARD, which will cause the state's
* input to become its output.
*
* @default $
*/
readonly resultPath?: string;

/**
* JSONPath expression to select the array to iterate over
*
* @default $
*/
readonly itemsPath?: string;

export interface MapProps extends BaseMapProps {
/**
* The JSON that you want to override your default iteration input (mutually exclusive with `itemSelector`).
*
@@ -74,54 +20,8 @@ export interface MapProps {
* @default $
*/
readonly parameters?: { [key: string]: any };

/**
* The JSON that you want to override your default iteration input (mutually exclusive with `parameters`).
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-itemselector.html
*
* @default $
*/
readonly itemSelector?: { [key: string]: any };

/**
* The JSON that will replace the state's raw result and become the effective
* result before ResultPath is applied.
*
* You can use ResultSelector to create a payload with values that are static
* or selected from the state's raw result.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html#input-output-resultselector
*
* @default - None
*/
readonly resultSelector?: { [key: string]: any };

/**
* MaxConcurrency
*
* An upper bound on the number of iterations you want running at once.
*
* @default - full concurrency
*/
readonly maxConcurrency?: number;
}

/**
* Returns true if the value passed is a positive integer
* @param value the value ti validate
*/

export const isPositiveInteger = (value: number) => {
const isFloat = Math.floor(value) !== value;

const isNotPositiveInteger = value < 0 || value > Number.MAX_SAFE_INTEGER;

return !isFloat && !isNotPositiveInteger;
};

/**
* Define a Map state in the state machine
*
@@ -133,49 +33,10 @@ export const isPositiveInteger = (value: number) => {
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html
*/
export class Map extends State implements INextable {
public readonly endStates: INextable[];

private readonly maxConcurrency: number | undefined;
private readonly itemsPath?: string;
private readonly itemSelector?: { [key: string]: any };

export class Map extends MapBase implements INextable {
constructor(scope: Construct, id: string, props: MapProps = {}) {
super(scope, id, props);
this.endStates = [this];
this.maxConcurrency = props.maxConcurrency;
this.itemsPath = props.itemsPath;
this.itemSelector = props.itemSelector;
}

/**
* Add retry configuration for this state
*
* This controls if and how the execution will be retried if a particular
* error occurs.
*/
public addRetry(props: RetryProps = {}): Map {
super._addRetry(props);
return this;
}

/**
* Add a recovery handler for this state
*
* When a particular error occurs, execution will continue at the error
* handler instead of failing the state machine execution.
*/
public addCatch(handler: IChainable, props: CatchProps = {}): Map {
super._addCatch(handler.startState, props);
return this;
}

/**
* Continue normal execution with the given state
*/
public next(next: IChainable): Chain {
super.makeNext(next.startState);
return Chain.sequence(this, next);
this.processorMode = ProcessorMode.INLINE;
}

/**
@@ -191,44 +52,22 @@ export class Map extends State implements INextable {
return this;
}

/**
* Define item processor in Map.
*
* A Map must either have a non-empty iterator or a non-empty item processor (mutually exclusive with `iterator`).
*/
public itemProcessor(processor: IChainable, config: ProcessorConfig = {}): Map {
const name = `Map ${this.stateId} Item Processor`;
const stateGraph = new StateGraph(processor.startState, name);
super.addItemProcessor(stateGraph, config);
return this;
}

/**
* Return the Amazon States Language object for this state
*/
public toStateJson(): object {
return {
Type: StateType.MAP,
Comment: this.comment,
ResultPath: renderJsonPath(this.resultPath),
...this.renderNextEnd(),
...this.renderInputOutput(),
...super.toStateJson(),
...this.renderParameters(),
...this.renderResultSelector(),
...this.renderRetryCatch(),
...this.renderIterator(),
...this.renderItemsPath(),
...this.renderItemSelector(),
...this.renderItemProcessor(),
MaxConcurrency: this.maxConcurrency,
};
}

/**
* Validate this state
*/
protected validateState(): string[] {
const errors: string[] = [];
const errors = super.validateState();

if (!this.iteration && !this.processor) {
errors.push('Map state must either have a non-empty iterator or a non-empty item processor');
@@ -242,23 +81,9 @@ export class Map extends State implements INextable {
errors.push('Map state cannot have both parameters and an item selector');
}

if (this.processorConfig?.mode === ProcessorMode.DISTRIBUTED && !this.processorConfig?.executionType) {
errors.push('You must specify an execution type for the distributed Map workflow');
}

if (this.maxConcurrency && !Token.isUnresolved(this.maxConcurrency) && !isPositiveInteger(this.maxConcurrency)) {
errors.push('maxConcurrency has to be a positive integer');
}

return errors;
}

private renderItemsPath(): any {
return {
ItemsPath: renderJsonPath(this.itemsPath),
};
}

/**
* Render Parameters in ASL JSON format
*/
@@ -270,16 +95,4 @@ export class Map extends State implements INextable {
Parameters: this.parameters,
});
}

/**
* Render ItemSelector in ASL JSON format
*/
private renderItemSelector(): any {
if (!this.itemSelector) {
return undefined;
}
return FieldUtils.renderObject({
ItemSelector: this.itemSelector,
});
}
}
3 changes: 2 additions & 1 deletion packages/aws-cdk-lib/aws-stepfunctions/lib/states/state.ts
abdelnn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -171,6 +171,7 @@ export abstract class State extends Construct implements IChainable {
protected readonly resultSelector?: object;
protected readonly branches: StateGraph[] = [];
protected iteration?: StateGraph;
protected processorMode?: ProcessorMode = ProcessorMode.INLINE;
protected processor?: StateGraph;
protected processorConfig?: ProcessorConfig;
protected defaultChoice?: State;
@@ -469,7 +470,7 @@ export abstract class State extends Construct implements IChainable {
* Render ProcessorConfig in ASL JSON format
*/
private renderProcessorConfig() {
const mode = this.processorConfig?.mode?.toString() ?? ProcessorMode.INLINE;
const mode = this.processorConfig?.mode?.toString() ?? this.processorMode;
if (mode === ProcessorMode.INLINE) {
return {
ProcessorConfig: {
776 changes: 776 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/test/distributed-map.test.ts

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/test/state-machine.test.ts
Original file line number Diff line number Diff line change
@@ -145,6 +145,50 @@ describe('State Machine', () => {

}),

test('Instantiate State Machine With Distributed Map State', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const map = new sfn.DistributedMap(stack, 'Map State');
map.itemProcessor(new sfn.Pass(stack, 'Pass'));
new sfn.StateMachine(stack, 'MyStateMachine', {
stateMachineName: 'MyStateMachine',
definition: map,
});

// THEN
stack;
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: 'states:StartExecution',
Effect: 'Allow',
Resource: { Ref: 'MyStateMachine6C968CA5' },
},
{
Action: [
'states:DescribeExecution',
'states:StopExecution',
],
Effect: 'Allow',
Resource: {
'Fn::Join': ['', [{ Ref: 'MyStateMachine6C968CA5' }, ':*']],
},
},
],
Version: '2012-10-17',
},
PolicyName: 'MyStateMachineDistributedMapPolicy11E47E72',
Roles: [
{
Ref: 'MyStateMachineRoleD59FFEBC',
},
],
});
}),

test('State Machine with invalid name', () => {
// GIVEN
const stack = new cdk.Stack();