Skip to content

Commit

Permalink
refactor(stepfunctions-tasks): make integrationPattern an enum (#3115)
Browse files Browse the repository at this point in the history
* refactor(aws-stepfunctions-tasks): implement service integration patterns for tasks

Step Functions allows users to call different integrated services in different ways. They are also called service integration patterns, including Request Response, Run a Job and Wait for Callback. Users must choose exactly one of them and specify it in the "Resource" field.

This commit introduces a new member variable, serviceIntegrationPattern, in the interface of properties within each existing integrated service. This helps to avoid using multiple boolean variables in the service such as ECS, which supports different service integration patterns. It is also beneficial for code maintenances: if Step Functions adds new integrated services or updates the existing integration patterns in the future, keeping pace with these changes will be simply updating this variable of enum type.

BREAKING CHANGE: To define a callback task, users should specify "serviceIntegrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN" instead of "waitForTaskToken: true". For a sync task, users should use "serviceIntegrationPattern: sfn.ServiceIntegrationPattern.SYNC" in the place of "synchronous: true". In addition, this commit enables users to define callback task with ECS.
**@aws-cdk/aws-stepfunctions-tasks**

Closes #3114

* serviceIntegrationPattern -> integrationPattern
  • Loading branch information
wqzoww authored and mergify[bot] committed Aug 8, 2019
1 parent c95eab6 commit fa48e89
Show file tree
Hide file tree
Showing 20 changed files with 300 additions and 60 deletions.
30 changes: 22 additions & 8 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import iam = require('@aws-cdk/aws-iam');
import sns = require('@aws-cdk/aws-sns');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { resourceArnSuffix } from './resource-arn-suffix';

/**
* Properties for PublishTask
Expand Down Expand Up @@ -29,11 +30,13 @@ export interface PublishToTopicProps {
readonly subject?: string;

/**
* Whether to pause the workflow until a task token is returned
* The service integration pattern indicates different ways to call Publish to SNS.
*
* @default false
* The valid value is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN.
*
* @default FIRE_AND_FORGET
*/
readonly waitForTaskToken?: boolean;
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
}

/**
Expand All @@ -44,19 +47,30 @@ export interface PublishToTopicProps {
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {

private readonly waitForTaskToken: boolean;
private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
this.waitForTaskToken = props.waitForTaskToken === true;
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
];

if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) {
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SNS.`);
}

if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN) {
if (!sfn.FieldUtils.containsTaskToken(props.message)) {
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
}
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
resourceArn: 'arn:aws:states:::sns:publish' + resourceArnSuffix.get(this.integrationPattern),
policyStatements: [new iam.PolicyStatement({
actions: ['sns:Publish'],
resources: [this.topic.topicArn]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import sfn = require('@aws-cdk/aws-stepfunctions');

/**
* Suffixes corresponding to different service integration patterns
*
* Key is the service integration pattern, value is the resource ARN suffix.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
*/
const resourceArnSuffix = new Map<sfn.ServiceIntegrationPattern, string>();
resourceArnSuffix.set(sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, "");
resourceArnSuffix.set(sfn.ServiceIntegrationPattern.SYNC, ".sync");
resourceArnSuffix.set(sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, ".waitForTaskToken");

export { resourceArnSuffix };
32 changes: 25 additions & 7 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import cdk = require('@aws-cdk/core');
import { Stack } from '@aws-cdk/core';
import { resourceArnSuffix } from './resource-arn-suffix';
import { ContainerOverride } from './run-ecs-task-base-types';

/**
Expand Down Expand Up @@ -32,11 +33,13 @@ export interface CommonEcsRunTaskProps {
readonly containerOverrides?: ContainerOverride[];

/**
* Whether to wait for the task to complete and return the response
* The service integration pattern indicates different ways to call RunTask in ECS.
*
* @default true
* The valid value for Lambda is FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN.
*
* @default FIRE_AND_FORGET
*/
readonly synchronous?: boolean;
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
}

/**
Expand All @@ -60,10 +63,25 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask

private securityGroup?: ec2.ISecurityGroup;
private networkConfiguration?: any;
private readonly sync: boolean;
private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(private readonly props: EcsRunTaskBaseProps) {
this.sync = props.synchronous !== false;
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
sfn.ServiceIntegrationPattern.SYNC
];

if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call ECS.`);
}

if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
&& !sfn.FieldUtils.containsTaskToken(props.containerOverrides)) {
throw new Error('Task Token is missing in containerOverrides (pass Context.taskToken somewhere in containerOverrides)');
}

for (const override of this.props.containerOverrides || []) {
const name = override.containerName;
Expand All @@ -86,7 +104,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask
}

return {
resourceArn: 'arn:aws:states:::ecs:runTask' + (this.sync ? '.sync' : ''),
resourceArn: 'arn:aws:states:::ecs:runTask' + resourceArnSuffix.get(this.integrationPattern),
parameters: {
Cluster: this.props.cluster.clusterArn,
TaskDefinition: this.props.taskDefinition.taskDefinitionArn,
Expand Down Expand Up @@ -139,7 +157,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask
}),
];

if (this.sync) {
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
policyStatements.push(new iam.PolicyStatement({
actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
resources: [stack.formatArn({
Expand Down
36 changes: 27 additions & 9 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import iam = require('@aws-cdk/aws-iam');
import lambda = require('@aws-cdk/aws-lambda');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { FieldUtils } from '../../aws-stepfunctions/lib/fields';
import { resourceArnSuffix } from './resource-arn-suffix';

/**
* Properties for RunLambdaTask
Expand All @@ -13,15 +13,18 @@ export interface RunLambdaTaskProps {
readonly payload?: { [key: string]: any };

/**
* Whether to pause the workflow until a task token is returned
* The service integration pattern indicates different ways to invoke Lambda function.
*
* If this is set to true, the Context.taskToken value must be included
* The valid value for Lambda is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN,
* it determines whether to pause the workflow until a task token is returned.
*
* If this is set to WAIT_FOR_TASK_TOKEN, the Context.taskToken value must be included
* somewhere in the payload and the Lambda must call
* `SendTaskSuccess/SendTaskFailure` using that token.
*
* @default false
* @default FIRE_AND_FORGET
*/
readonly waitForTaskToken?: boolean;
readonly integrationPattern?: sfn.ServiceIntegrationPattern;

/**
* Invocation type of the Lambda function
Expand Down Expand Up @@ -55,18 +58,28 @@ export interface RunLambdaTaskProps {
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
*/
export class RunLambdaTask implements sfn.IStepFunctionsTask {
private readonly waitForTaskToken: boolean;
private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) {
this.waitForTaskToken = !!props.waitForTaskToken;
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
];

if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call Lambda.`);
}

if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
&& !sfn.FieldUtils.containsTaskToken(props.payload)) {
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : '');
const resourceArn = 'arn:aws:states:::lambda:invoke' + resourceArnSuffix.get(this.integrationPattern);

return {
resourceArn,
Expand Down Expand Up @@ -106,4 +119,9 @@ export enum InvocationType {
* The API response only includes a status code.
*/
EVENT = 'Event',

/**
* TValidate parameter values and verify that the user or role has permission to invoke the function.
*/
DRY_RUN = 'DryRun'
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2');
import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { Construct, Duration, Stack } from '@aws-cdk/core';
import { resourceArnSuffix } from './resource-arn-suffix';
import { AlgorithmSpecification, Channel, InputMode, OutputDataConfig, ResourceConfig,
S3DataType, StoppingCondition, VpcConfig, } from './sagemaker-task-base-types';

Expand All @@ -26,11 +27,13 @@ export interface SagemakerTrainTaskProps {
readonly role?: iam.IRole;

/**
* Specify if the task is synchronous or asychronous.
* The service integration pattern indicates different ways to call SageMaker APIs.
*
* @default false
* The valid value is either FIRE_AND_FORGET or SYNC.
*
* @default FIRE_AND_FORGET
*/
readonly synchronous?: boolean;
readonly integrationPattern?: sfn.ServiceIntegrationPattern;

/**
* Identifies the training algorithm to use.
Expand Down Expand Up @@ -114,7 +117,19 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn
*/
private readonly stoppingCondition: StoppingCondition;

private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(scope: Construct, private readonly props: SagemakerTrainTaskProps) {
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.SYNC
];

if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`);
}

// set the default resource config if not defined.
this.resourceConfig = props.resourceConfig || {
Expand Down Expand Up @@ -194,7 +209,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn

public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''),
resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + resourceArnSuffix.get(this.integrationPattern),
parameters: this.renderParameters(),
policyStatements: this.makePolicyStatements(task),
};
Expand Down Expand Up @@ -322,7 +337,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn
})
];

if (this.props.synchronous) {
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
policyStatements.push(new iam.PolicyStatement({
actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
resources: [stack.formatArn({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2');
import iam = require('@aws-cdk/aws-iam');
import sfn = require('@aws-cdk/aws-stepfunctions');
import { Construct, Stack } from '@aws-cdk/core';
import { resourceArnSuffix } from './resource-arn-suffix';
import { BatchStrategy, S3DataType, TransformInput, TransformOutput, TransformResources } from './sagemaker-task-base-types';

/**
Expand All @@ -20,9 +21,13 @@ export interface SagemakerTransformProps {
readonly role?: iam.IRole;

/**
* Specify if the task is synchronous or asychronous.
* The service integration pattern indicates different ways to call SageMaker APIs.
*
* The valid value is either FIRE_AND_FORGET or SYNC.
*
* @default FIRE_AND_FORGET
*/
readonly synchronous?: boolean;
readonly integrationPattern?: sfn.ServiceIntegrationPattern;

/**
* Number of records to include in a mini-batch for an HTTP inference request.
Expand Down Expand Up @@ -94,7 +99,19 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
*/
private readonly transformResources: TransformResources;

private readonly integrationPattern: sfn.ServiceIntegrationPattern;

constructor(scope: Construct, private readonly props: SagemakerTransformProps) {
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;

const supportedPatterns = [
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
sfn.ServiceIntegrationPattern.SYNC
];

if (!supportedPatterns.includes(this.integrationPattern)) {
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`);
}

// set the sagemaker role or create new one
this.role = props.role || new iam.Role(scope, 'SagemakerRole', {
Expand Down Expand Up @@ -124,7 +141,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {

public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
return {
resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''),
resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + resourceArnSuffix.get(this.integrationPattern),
parameters: this.renderParameters(),
policyStatements: this.makePolicyStatements(task),
};
Expand Down Expand Up @@ -216,7 +233,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
})
];

if (this.props.synchronous) {
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
policyStatements.push(new iam.PolicyStatement({
actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
resources: [stack.formatArn({
Expand Down
Loading

0 comments on commit fa48e89

Please sign in to comment.