Skip to content

Commit

Permalink
feat(aws-stepfunctions-tasks): support step functions state machine e…
Browse files Browse the repository at this point in the history
…xecution from a task state (#3522)

Step Functions allows to start state machine executions by calling its own API StartExecution as an integrated service. This commit is made to implement this new feature.

The corresponding IAM poliy has been defined and the service principal in the IAM role has been fixed.

Closes #3521
  • Loading branch information
wqzoww authored and mergify[bot] committed Aug 9, 2019
1 parent 1a17fc7 commit ac77990
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 7 deletions.
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ export * from './run-ecs-ec2-task';
export * from './run-ecs-fargate-task';
export * from './sagemaker-task-base-types';
export * from './sagemaker-train-task';
export * from './sagemaker-transform-task';
export * from './sagemaker-transform-task';
export * from './start-execution';
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ export interface InvokeActivityProps {
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A Step Functions Task to invoke an Activity worker.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
* An Activity can be used directly as a Resource.
*/
export class InvokeActivity implements sfn.IStepFunctionsTask {
constructor(private readonly activity: sfn.IActivity, private readonly props: InvokeActivityProps = {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*
* This parameter is named as payload to keep consistent with RunLambdaTask class.
*
* @default - The JSON data indicated by the task's InputPath is used as payload
*/
readonly payload?: { [key: string]: any };
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A Step Functions Task to invoke a Lambda function.
*
* The Lambda function Arn is defined as Resource in the state machine definition.
*
* OUTPUT: the output of this task is the return value of the Lambda Function.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export interface PublishToTopicProps {
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A Step Functions Task to publish messages to SNS topic.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export interface SendToQueueProps {
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A StepFunctions Task to send messages to SQS queue.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
Expand Down
111 changes: 111 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/start-execution.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { Stack } from '@aws-cdk/core';
import { resourceArnSuffix } from './resource-arn-suffix';

/**
* Properties for StartExecution
*/
export interface StartExecutionProps {
/**
* The JSON input for the execution, same as that of StartExecution.
*
* @see https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html
*/
readonly input?: { [key: string]: any };

/**
* The name of the execution, same as that of StartExecution.
*
* @see https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html
*/
readonly name?: string;

/**
* The service integration pattern indicates different ways to call StartExecution to Step Functions.
*
* @default FIRE_AND_FORGET
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
*/
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
}

/**
* A Step Functions Task to call StartExecution on another state machine.
*
* It supports three service integration patterns: FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN.
*/
export class StartExecution implements sfn.IStepFunctionsTask {
private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(private readonly stateMachine: sfn.IStateMachine, private readonly props: StartExecutionProps = {}) {
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.SYNC,
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
];

if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call Step Functions.`);
}

if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
&& !sfn.FieldUtils.containsTaskToken(props.input)) {
throw new Error('Task Token is missing in input (pass Context.taskToken somewhere in input)');
}
}

public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = 'arn:aws:states:::states:startExecution' + resourceArnSuffix.get(this.integrationPattern);

return {
resourceArn,
policyStatements: this.createScopedAccessPolicy(task),
parameters: {
Input: this.props.input,
StateMachineArn: this.stateMachine.stateMachineArn,
Name: this.props.name
}
};
}

/**
* As StateMachineArn is extracted automatically from the state machine object included in the constructor,
*
* the scoped access policy should be generated accordingly.
*
* This means the action of StartExecution should be restricted on the given state machine, instead of being granted to all the resources (*).
*/
private createScopedAccessPolicy(task: sfn.Task): iam.PolicyStatement[] {
const stack = Stack.of(task);

const policyStatements = [
new iam.PolicyStatement({
actions: ['states:StartExecution'],
resources: [this.stateMachine.stateMachineArn]
})
];

// Step Functions use Cloud Watch managed rules to deal with synchronous tasks.
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
policyStatements.push(new iam.PolicyStatement({
actions: ['states:DescribeExecution', 'states:StopExecution'],
resources: ['*']
}));

policyStatements.push(new iam.PolicyStatement({
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
resources: [stack.formatArn({
service: 'events',
resource: 'rule',
resourceName: 'StepFunctionsGetEventsForStepFunctionsExecutionRule'
})]
}));
}

return policyStatements;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import '@aws-cdk/assert/jest';
import sfn = require('@aws-cdk/aws-stepfunctions');
import { Stack } from '@aws-cdk/core';
import tasks = require('../lib');

let stack: Stack;
let child: sfn.StateMachine;
beforeEach(() => {
stack = new Stack();
child = new sfn.StateMachine(stack, 'ChildStateMachine', {
definition: sfn.Chain.start(new sfn.Pass(stack, 'PassState')),
});
});

test('Execute State Machine - Default - Fire and Forget', () => {
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
input: {
foo: 'bar'
},
name: 'myExecutionName'
})
});

new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"ChildTask\",\"States\":{\"ChildTask\":{\"End\":true,\"Parameters\":{\"Input\":{\"foo\":\"bar\"},\"StateMachineArn\":\"",
{ Ref: "ChildStateMachine9133117F" },
"\",\"Name\":\"myExecutionName\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::states:startExecution\"}}}"
]]
},
});
});

test('Execute State Machine - Sync', () => {
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
integrationPattern: sfn.ServiceIntegrationPattern.SYNC
})
});

new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"ChildTask\",\"States\":{\"ChildTask\":{\"End\":true,\"Parameters\":{\"StateMachineArn\":\"",
{ Ref: "ChildStateMachine9133117F" },
"\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::states:startExecution.sync\"}}}"
]]
},
});
});

test('Execute State Machine - Wait For Task Token', () => {
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: {
token: sfn.Context.taskToken
}
})
});

new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"ChildTask\",\"States\":{\"ChildTask\":{\"End\":true,\"Parameters\":{\"Input\":"
+ "{\"token.$\":\"$$.Task.Token\"},\"StateMachineArn\":\"",
{ Ref: "ChildStateMachine9133117F" },
"\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::states:startExecution.waitForTaskToken\"}}}"
]]
},
});
});

test('Execute State Machine - Wait For Task Token - Missing Task Token', () => {
expect(() => {
new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
})
});
}).toThrow('Task Token is missing in input (pass Context.taskToken somewhere in input');
});
27 changes: 27 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ couple of the tasks available are:
depending on the type of capacity.
* `tasks.SagemakerTrainTask` -- run a SageMaker training job
* `tasks.SagemakerTransformTask` -- run a SageMaker transform job
* `tasks.StartExecution` -- call StartExecution to a state machine of Step Functions

Except `tasks.InvokeActivity` and `tasks.InvokeFunction`, the [service integration
pattern](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html)
Expand Down Expand Up @@ -302,6 +303,32 @@ const task = new sfn.Task(this, 'Batch Inference', {
});
```

#### Step Functions example

```ts
// Define a state machine with one Pass state
const child = new sfn.StateMachine(stack, 'ChildStateMachine', {
definition: sfn.Chain.start(new sfn.Pass(stack, 'PassState')),
});

// Include the state machine in a Task state with callback pattern
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.ExecuteStateMachine(child, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
input: {
token: sfn.Context.taskToken,
foo: 'bar'
},
name: 'MyExecutionName'
})
});

// Define a second state machine with the Task state above
new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});
```

### Pass

A `Pass` state does no work, but it can optionally transform the execution's
Expand Down

0 comments on commit ac77990

Please sign in to comment.