Skip to content

Commit

Permalink
feat(aws-stepfunctions-tasks): support step functions state machine e…
Browse files Browse the repository at this point in the history
…xecution from a task state

Step Functions allows to start state machine executions by calling its own API StartExecution as an integrated service. This commit is made to implement this new feature.

The corresponding IAM poliy has been defined and the service principal in the IAM role has been fixed.

Closes #3521
  • Loading branch information
wqzoww committed Aug 2, 2019
1 parent 8a100e5 commit 2cb50f3
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 8 deletions.
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ export * from './run-ecs-ec2-task';
export * from './run-ecs-fargate-task';
export * from './sagemaker-task-base-types';
export * from './sagemaker-train-task';
export * from './sagemaker-transform-task';
export * from './sagemaker-transform-task';
export * from './start-execution';
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ export interface InvokeActivityProps {
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A Step Functions Task to invoke an Activity worker.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
* An Activity can be used directly as a Resource.
*/
export class InvokeActivity implements sfn.IStepFunctionsTask {
constructor(private readonly activity: sfn.IActivity, private readonly props: InvokeActivityProps = {}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ export interface InvokeFunctionProps {
/**
* The JSON that you want to provide to your Lambda function as input.
*
* This parameter is named as payload to keep consistent with RunLambdaTask class.
*
* @default - The JSON data indicated by the task's InputPath is used as payload
*/
readonly payload?: { [key: string]: any };
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A Step Functions Task to invoke a Lambda function.
*
* The Lambda function Arn is defined as Resource in the state machine definition.
*
* OUTPUT: the output of this task is the return value of the Lambda Function.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export interface PublishToTopicProps {
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A Step Functions Task to publish messages to SNS topic.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export interface SendToQueueProps {
}

/**
* A StepFunctions Task to invoke a Lambda function.
* A StepFunctions Task to send messages to SQS queue.
*
* A Function can be used directly as a Resource, but this class mirrors
* integration with other AWS services via a specific class instance.
Expand Down
110 changes: 110 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/start-execution.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { FieldUtils } from '@aws-cdk/aws-stepfunctions/lib/fields';
import { Stack } from '@aws-cdk/core';

/**
* Properties for ExecuteStateMachineProps
*/
export interface ExecuteStateMachineProps {
/**
* The JSON input for the execution, same as that of StartExecution.
* See {@link https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html|StartExecution}
*/
readonly input?: { [key: string]: any };

/**
* The name of the execution, same as that of StartExecution.
* See {@link https://docs.aws.amazon.com/step-functions/latest/apireference/API_StartExecution.html|StartExecution}
*/
readonly name?: string;

/**
* Pause the workflow until the task token is returned by SendTaskSuccess or SendTaskFailure.
* See {@link https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token|WaitForCallback}
*/
readonly waitForTaskToken?: boolean;

/**
* Wait for the task/job to complete before processing the next state.
* See {@link https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync|Sync}
*/
readonly sync?: boolean;
}

/**
* A Step Functions Task to call StartExecution on another state machine.
*
* It supports three service integration patterns: request response, sync and wait for callback.
*/
export class StartExecution implements sfn.IStepFunctionsTask {
private readonly waitForTaskToken: boolean;
private readonly sync: boolean;

constructor(private readonly stateMachine: sfn.IStateMachine, private readonly props: ExecuteStateMachineProps = {}) {
this.waitForTaskToken = !!props.waitForTaskToken;
this.sync = !!props.sync;

// Users should choose only one of the patterns.
if (this.waitForTaskToken && this.sync) {
throw new Error('Both waitForTaskToken and sync are assigned to true');
}

// When waitForTaskToken is set to true, users must provide task token in the parameter input.
if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.input)) {
throw new Error('Task Token is missing in input (pass Context.taskToken somewhere in input)');
}
}

public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = 'arn:aws:states:::states:startExecution' +
(this.waitForTaskToken ? '.waitForTaskToken' : '') +
(this.sync ? '.sync' : '');

return {
resourceArn,
policyStatements: this.createScopedAccessPolicy(task),
parameters: {
Input: this.props.input,
StateMachineArn: this.stateMachine.stateMachineArn,
Name: this.props.name
}
};
}

/**
* As StateMachineArn is extracted automatically from the state machine object included in the constructor,
* the scoped access policy should be generated accordingly.
*
* This means the action of StartExecution should be restricted on the given state machine, instead of being granted to all the resources (*).
*/
private createScopedAccessPolicy(task: sfn.Task): iam.PolicyStatement[] {
const stack = Stack.of(task);

const policyStatements = [
new iam.PolicyStatement({
actions: ['states:StartExecution'],
resources: [this.stateMachine.stateMachineArn]
})
];

// Step Functions use Cloud Watch managed rules to deal with synchronous tasks.
if (this.sync) {
policyStatements.push(new iam.PolicyStatement({
actions: ['states:DescribeExecution', 'states:StopExecution'],
resources: ['*']
}));

policyStatements.push(new iam.PolicyStatement({
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
resources: [stack.formatArn({
service: 'events',
resource: 'rule',
resourceName: 'StepFunctionsGetEventsForStepFunctionsExecutionRule'
})]
}));
}

return policyStatements;
}
}
117 changes: 117 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/test/start-execution.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import '@aws-cdk/assert/jest';
import sfn = require('@aws-cdk/aws-stepfunctions');
import { Stack } from '@aws-cdk/core';
import tasks = require('../lib');

let stack: Stack;
let child: sfn.StateMachine;
beforeEach(() => {
stack = new Stack();
child = new sfn.StateMachine(stack, 'ChildStateMachine', {
definition: sfn.Chain.start(new sfn.Pass(stack, 'PassState')),
});
});

test('Execute State Machine - Request Response', () => {
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
input: {
foo: 'bar'
},
name: 'myExecutionName'
})
});

new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"ChildTask\",\"States\":{\"ChildTask\":{\"End\":true,\"Parameters\":{\"Input\":{\"foo\":\"bar\"},\"StateMachineArn\":\"",
{ Ref: "ChildStateMachine9133117F" },
"\",\"Name\":\"myExecutionName\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::states:startExecution\"}}}"
]]
},
});
});

test('Execute State Machine - Sync', () => {
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
sync: true
})
});

new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"ChildTask\",\"States\":{\"ChildTask\":{\"End\":true,\"Parameters\":{\"StateMachineArn\":\"",
{ Ref: "ChildStateMachine9133117F" },
"\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::states:startExecution.sync\"}}}"
]]
},
});
});

test('Execute State Machine - Wait For Task Token', () => {
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
waitForTaskToken: true,
input: {
token: sfn.Context.taskToken
}
})
});

new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});

expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
DefinitionString: {
"Fn::Join": ["", [
"{\"StartAt\":\"ChildTask\",\"States\":{\"ChildTask\":{\"End\":true,\"Parameters\":{\"Input\":"
+ "{\"token.$\":\"$$.Task.Token\"},\"StateMachineArn\":\"",
{ Ref: "ChildStateMachine9133117F" },
"\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::states:startExecution.waitForTaskToken\"}}}"
]]
},
});
});

test('Execute State Machine - Wait For Task Token - Missing Task Token', () => {
expect(() => {
new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
waitForTaskToken: true
})
});
}).toThrow('Task Token is missing in input (pass Context.taskToken somewhere in input');
});

test('Execute State Machine - Wait For Task Token - Missing Task Token', () => {
expect(() => {
new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
waitForTaskToken: true
})
});
}).toThrow('Task Token is missing in input (pass Context.taskToken somewhere in input');
});

test('Execute State Machine - Both Service Integration Patterns Are True', () => {
expect(() => {
new sfn.Task(stack, 'ChildTask', {
task: new tasks.StartExecution(child, {
waitForTaskToken: true,
sync: true
})
});
}).toThrow('Both waitForTaskToken and sync are assigned to true');
});
26 changes: 26 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,32 @@ const task = new sfn.Task(this, 'Batch Inference', {
});
```

#### Step Functions example

```ts
// Define a state machine with one Pass state
const child = new sfn.StateMachine(stack, 'ChildStateMachine', {
definition: sfn.Chain.start(new sfn.Pass(stack, 'PassState')),
});

// Include the state machine in a Task state with callback pattern
const task = new sfn.Task(stack, 'ChildTask', {
task: new tasks.ExecuteStateMachine(child, {
waitForTaskToken: true,
input: {
token: sfn.Context.taskToken,
foo: 'bar'
},
name: 'MyExecutionName'
})
});

// Define a second state machine with the Task state above
new sfn.StateMachine(stack, 'ParentStateMachine', {
definition: task
});
```

### Pass

A `Pass` state does no work, but it can optionally transform the execution's
Expand Down
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class StateMachine extends StateMachineBase {
});

this.role = props.role || new iam.Role(this, 'Role', {
assumedBy: new iam.ServicePrincipal(`states.${Stack.of(this).region}.amazonaws.com`),
assumedBy: new iam.ServicePrincipal(`states.amazonaws.com`),
});

const graph = new StateGraph(props.definition.startState, `State Machine ${id} definition`);
Expand Down

0 comments on commit 2cb50f3

Please sign in to comment.