From fa48e89fb217d58cacc9050de73ac42bad8039bc Mon Sep 17 00:00:00 2001 From: Wenqian Wang <51062650+wqzoww@users.noreply.github.com> Date: Thu, 8 Aug 2019 03:56:30 -0700 Subject: [PATCH] refactor(stepfunctions-tasks): make integrationPattern an enum (#3115) * refactor(aws-stepfunctions-tasks): implement service integration patterns for tasks Step Functions allows users to call different integrated services in different ways. They are also called service integration patterns, including Request Response, Run a Job and Wait for Callback. Users must choose exactly one of them and specify it in the "Resource" field. This commit introduces a new member variable, serviceIntegrationPattern, in the interface of properties within each existing integrated service. This helps to avoid using multiple boolean variables in the service such as ECS, which supports different service integration patterns. It is also beneficial for code maintenances: if Step Functions adds new integrated services or updates the existing integration patterns in the future, keeping pace with these changes will be simply updating this variable of enum type. BREAKING CHANGE: To define a callback task, users should specify "serviceIntegrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN" instead of "waitForTaskToken: true". For a sync task, users should use "serviceIntegrationPattern: sfn.ServiceIntegrationPattern.SYNC" in the place of "synchronous: true". In addition, this commit enables users to define callback task with ECS. **@aws-cdk/aws-stepfunctions-tasks** Closes #3114 * serviceIntegrationPattern -> integrationPattern --- .../lib/publish-to-topic.ts | 30 +++++++++++----- .../lib/resource-arn-suffix.ts | 15 ++++++++ .../lib/run-ecs-task-base.ts | 32 +++++++++++++---- .../lib/run-lambda-task.ts | 36 ++++++++++++++----- .../lib/sagemaker-train-task.ts | 25 ++++++++++--- .../lib/sagemaker-transform-task.ts | 25 ++++++++++--- .../lib/send-to-queue.ts | 30 +++++++++++----- .../test/ecs-tasks.test.ts | 4 +++ .../test/integ.ec2-task.ts | 4 ++- .../test/integ.fargate-task.ts | 1 + .../test/integ.invoke-function.ts | 2 +- .../test/publish-to-topic.test.ts | 18 ++++++++-- .../test/run-lambda-task.test.ts | 16 +++++++-- .../test/sagemaker-training-job.test.ts | 27 +++++++++++++- .../test/sagemaker-transform-job.test.ts | 22 +++++++++++- .../test/send-to-queue.test.ts | 17 ++++++--- packages/@aws-cdk/aws-stepfunctions/README.md | 26 ++++++++++++-- .../aws-stepfunctions/lib/state-machine.ts | 2 +- .../lib/step-functions-task.ts | 25 +++++++++++++ .../@aws-cdk/aws-stepfunctions/package.json | 3 +- 20 files changed, 300 insertions(+), 60 deletions(-) create mode 100644 packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts index 011fe6d1c8e4e..c4aa62319df8c 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts @@ -1,6 +1,7 @@ import iam = require('@aws-cdk/aws-iam'); import sns = require('@aws-cdk/aws-sns'); import sfn = require('@aws-cdk/aws-stepfunctions'); +import { resourceArnSuffix } from './resource-arn-suffix'; /** * Properties for PublishTask @@ -29,11 +30,13 @@ export interface PublishToTopicProps { readonly subject?: string; /** - * Whether to pause the workflow until a task token is returned + * The service integration pattern indicates different ways to call Publish to SNS. * - * @default false + * The valid value is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN. + * + * @default FIRE_AND_FORGET */ - readonly waitForTaskToken?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; } /** @@ -44,19 +47,30 @@ export interface PublishToTopicProps { */ export class PublishToTopic implements sfn.IStepFunctionsTask { - private readonly waitForTaskToken: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) { - this.waitForTaskToken = props.waitForTaskToken === true; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + ]; - if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) { - throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)'); + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SNS.`); + } + + if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN) { + if (!sfn.FieldUtils.containsTaskToken(props.message)) { + throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)'); + } } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), + resourceArn: 'arn:aws:states:::sns:publish' + resourceArnSuffix.get(this.integrationPattern), policyStatements: [new iam.PolicyStatement({ actions: ['sns:Publish'], resources: [this.topic.topicArn] diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts new file mode 100644 index 0000000000000..be7d81e8fef78 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts @@ -0,0 +1,15 @@ +import sfn = require('@aws-cdk/aws-stepfunctions'); + +/** + * Suffixes corresponding to different service integration patterns + * + * Key is the service integration pattern, value is the resource ARN suffix. + * + * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html + */ +const resourceArnSuffix = new Map(); +resourceArnSuffix.set(sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, ""); +resourceArnSuffix.set(sfn.ServiceIntegrationPattern.SYNC, ".sync"); +resourceArnSuffix.set(sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, ".waitForTaskToken"); + +export { resourceArnSuffix }; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts index ca864d2d7cf91..379bd8acfc9da 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts @@ -4,6 +4,7 @@ import iam = require('@aws-cdk/aws-iam'); import sfn = require('@aws-cdk/aws-stepfunctions'); import cdk = require('@aws-cdk/core'); import { Stack } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; import { ContainerOverride } from './run-ecs-task-base-types'; /** @@ -32,11 +33,13 @@ export interface CommonEcsRunTaskProps { readonly containerOverrides?: ContainerOverride[]; /** - * Whether to wait for the task to complete and return the response + * The service integration pattern indicates different ways to call RunTask in ECS. * - * @default true + * The valid value for Lambda is FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN. + * + * @default FIRE_AND_FORGET */ - readonly synchronous?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; } /** @@ -60,10 +63,25 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask private securityGroup?: ec2.ISecurityGroup; private networkConfiguration?: any; - private readonly sync: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly props: EcsRunTaskBaseProps) { - this.sync = props.synchronous !== false; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call ECS.`); + } + + if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + && !sfn.FieldUtils.containsTaskToken(props.containerOverrides)) { + throw new Error('Task Token is missing in containerOverrides (pass Context.taskToken somewhere in containerOverrides)'); + } for (const override of this.props.containerOverrides || []) { const name = override.containerName; @@ -86,7 +104,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask } return { - resourceArn: 'arn:aws:states:::ecs:runTask' + (this.sync ? '.sync' : ''), + resourceArn: 'arn:aws:states:::ecs:runTask' + resourceArnSuffix.get(this.integrationPattern), parameters: { Cluster: this.props.cluster.clusterArn, TaskDefinition: this.props.taskDefinition.taskDefinitionArn, @@ -139,7 +157,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask }), ]; - if (this.sync) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { policyStatements.push(new iam.PolicyStatement({ actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"], resources: [stack.formatArn({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts index e6330e634230c..5d58164a641d8 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts @@ -1,7 +1,7 @@ import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); import sfn = require('@aws-cdk/aws-stepfunctions'); -import { FieldUtils } from '../../aws-stepfunctions/lib/fields'; +import { resourceArnSuffix } from './resource-arn-suffix'; /** * Properties for RunLambdaTask @@ -13,15 +13,18 @@ export interface RunLambdaTaskProps { readonly payload?: { [key: string]: any }; /** - * Whether to pause the workflow until a task token is returned + * The service integration pattern indicates different ways to invoke Lambda function. * - * If this is set to true, the Context.taskToken value must be included + * The valid value for Lambda is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN, + * it determines whether to pause the workflow until a task token is returned. + * + * If this is set to WAIT_FOR_TASK_TOKEN, the Context.taskToken value must be included * somewhere in the payload and the Lambda must call * `SendTaskSuccess/SendTaskFailure` using that token. * - * @default false + * @default FIRE_AND_FORGET */ - readonly waitForTaskToken?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** * Invocation type of the Lambda function @@ -55,18 +58,28 @@ export interface RunLambdaTaskProps { * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html */ export class RunLambdaTask implements sfn.IStepFunctionsTask { - private readonly waitForTaskToken: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) { - this.waitForTaskToken = !!props.waitForTaskToken; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call Lambda.`); + } - if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + && !sfn.FieldUtils.containsTaskToken(props.payload)) { throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)'); } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { - const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : ''); + const resourceArn = 'arn:aws:states:::lambda:invoke' + resourceArnSuffix.get(this.integrationPattern); return { resourceArn, @@ -106,4 +119,9 @@ export enum InvocationType { * The API response only includes a status code. */ EVENT = 'Event', + + /** + * TValidate parameter values and verify that the user or role has permission to invoke the function. + */ + DRY_RUN = 'DryRun' } \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts index 61807c1adcab5..f4f062e55db29 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts @@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2'); import iam = require('@aws-cdk/aws-iam'); import sfn = require('@aws-cdk/aws-stepfunctions'); import { Construct, Duration, Stack } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; import { AlgorithmSpecification, Channel, InputMode, OutputDataConfig, ResourceConfig, S3DataType, StoppingCondition, VpcConfig, } from './sagemaker-task-base-types'; @@ -26,11 +27,13 @@ export interface SagemakerTrainTaskProps { readonly role?: iam.IRole; /** - * Specify if the task is synchronous or asychronous. + * The service integration pattern indicates different ways to call SageMaker APIs. * - * @default false + * The valid value is either FIRE_AND_FORGET or SYNC. + * + * @default FIRE_AND_FORGET */ - readonly synchronous?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** * Identifies the training algorithm to use. @@ -114,7 +117,19 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn */ private readonly stoppingCondition: StoppingCondition; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; + constructor(scope: Construct, private readonly props: SagemakerTrainTaskProps) { + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`); + } // set the default resource config if not defined. this.resourceConfig = props.resourceConfig || { @@ -194,7 +209,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''), + resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + resourceArnSuffix.get(this.integrationPattern), parameters: this.renderParameters(), policyStatements: this.makePolicyStatements(task), }; @@ -322,7 +337,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn }) ]; - if (this.props.synchronous) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { policyStatements.push(new iam.PolicyStatement({ actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"], resources: [stack.formatArn({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts index ad3663d2687c5..eeabb1db2984c 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts @@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2'); import iam = require('@aws-cdk/aws-iam'); import sfn = require('@aws-cdk/aws-stepfunctions'); import { Construct, Stack } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; import { BatchStrategy, S3DataType, TransformInput, TransformOutput, TransformResources } from './sagemaker-task-base-types'; /** @@ -20,9 +21,13 @@ export interface SagemakerTransformProps { readonly role?: iam.IRole; /** - * Specify if the task is synchronous or asychronous. + * The service integration pattern indicates different ways to call SageMaker APIs. + * + * The valid value is either FIRE_AND_FORGET or SYNC. + * + * @default FIRE_AND_FORGET */ - readonly synchronous?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** * Number of records to include in a mini-batch for an HTTP inference request. @@ -94,7 +99,19 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { */ private readonly transformResources: TransformResources; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; + constructor(scope: Construct, private readonly props: SagemakerTransformProps) { + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`); + } // set the sagemaker role or create new one this.role = props.role || new iam.Role(scope, 'SagemakerRole', { @@ -124,7 +141,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''), + resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + resourceArnSuffix.get(this.integrationPattern), parameters: this.renderParameters(), policyStatements: this.makePolicyStatements(task), }; @@ -216,7 +233,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { }) ]; - if (this.props.synchronous) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { policyStatements.push(new iam.PolicyStatement({ actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"], resources: [stack.formatArn({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts index 015a8b783aecb..401515725ec66 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts @@ -2,6 +2,7 @@ import iam = require('@aws-cdk/aws-iam'); import sqs = require('@aws-cdk/aws-sqs'); import sfn = require('@aws-cdk/aws-stepfunctions'); import { Duration } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; /** * Properties for SendMessageTask @@ -39,11 +40,13 @@ export interface SendToQueueProps { readonly messageGroupId?: string; /** - * Whether to pause the workflow until a task token is returned + * The service integration pattern indicates different ways to call SendMessage to SQS. * - * @default false + * The valid value is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN. + * + * @default FIRE_AND_FORGET */ - readonly waitForTaskToken?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; } /** @@ -54,19 +57,30 @@ export interface SendToQueueProps { */ export class SendToQueue implements sfn.IStepFunctionsTask { - private readonly waitForTaskToken: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) { - this.waitForTaskToken = props.waitForTaskToken === true; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + ]; - if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.messageBody.value)) { - throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)'); + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SQS.`); + } + + if (props.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN) { + if (!sfn.FieldUtils.containsTaskToken(props.messageBody)) { + throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)'); + } } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), + resourceArn: 'arn:aws:states:::sqs:sendMessage' + resourceArnSuffix.get(this.integrationPattern), policyStatements: [new iam.PolicyStatement({ actions: ['sqs:SendMessage'], resources: [this.queue.queueArn] diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts index d6efeb24c59c6..f3a8669b915bb 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts @@ -57,6 +57,7 @@ test('Running a Fargate Task', () => { // WHEN const runTask = new sfn.Task(stack, 'RunFargate', { task: new tasks.RunEcsFargateTask({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, containerOverrides: [ @@ -154,6 +155,7 @@ test('Running an EC2 Task with bridge network', () => { // WHEN const runTask = new sfn.Task(stack, 'Run', { task: new tasks.RunEcsEc2Task({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, containerOverrides: [ @@ -241,6 +243,7 @@ test('Running an EC2 Task with placement strategies', () => { }); const ec2Task = new tasks.RunEcsEc2Task({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, placementStrategies: [ @@ -289,6 +292,7 @@ test('Running an EC2 Task with overridden number values', () => { }); const ec2Task = new tasks.RunEcsEc2Task({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, containerOverrides: [ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts index b512fd605c302..b18ed06e60b56 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts @@ -35,7 +35,9 @@ taskDefinition.addContainer('TheContainer', { const definition = new sfn.Pass(stack, 'Start', { result: sfn.Result.fromObject({ SomeKey: 'SomeValue' }) }).next(new sfn.Task(stack, 'Run', { task: new tasks.RunEcsEc2Task({ - cluster, taskDefinition, + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + cluster, + taskDefinition, containerOverrides: [ { containerName: 'TheContainer', diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts index 01501a7f3d5e4..29fde815aa1bb 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts @@ -34,6 +34,7 @@ taskDefinition.addContainer('TheContainer', { const definition = new sfn.Pass(stack, 'Start', { result: sfn.Result.fromObject({ SomeKey: 'SomeValue' }) }).next(new sfn.Task(stack, 'FargateTask', { task: new tasks.RunEcsFargateTask({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, assignPublicIp: true, containerOverrides: [ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts index 6d6d69dd892c1..136764cfd9dc5 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts @@ -25,7 +25,7 @@ const callbackHandler = new Function(stack, 'CallbackHandler', { const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', { task: new tasks.RunLambdaTask(callbackHandler, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, payload: { token: sfn.Context.taskToken } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts index 6108f341bdd24..18a0bf421ff28 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts @@ -32,7 +32,7 @@ test('Publish JSON to SNS topic with task token', () => { // WHEN const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, message: sfn.TaskInput.fromObject({ Input: 'Publish this message', Token: sfn.Context.taskToken @@ -54,14 +54,14 @@ test('Publish JSON to SNS topic with task token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included in message', () => { +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in message', () => { expect(() => { // GIVEN const stack = new cdk.Stack(); const topic = new sns.Topic(stack, 'Topic'); // WHEN new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, message: sfn.TaskInput.fromText('Publish this message') }) }); // THEN @@ -89,3 +89,15 @@ test('Publish to topic with ARN from payload', () => { }, }); }); + +test('Task throws if SYNC is supplied as service integration pattern', () => { + expect(() => { + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + + new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + message: sfn.TaskInput.fromText('Publish this message') + }) }); + }).toThrow(/Invalid Service Integration Pattern: SYNC is not supported to call SNS./i); +}); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts index bf49993b28131..1a5d997551bf7 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts @@ -45,7 +45,7 @@ test('Invoke lambda with default magic ARN', () => { test('Lambda function can be used in a Task with Task Token', () => { const task = new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, payload: { token: sfn.Context.taskToken } @@ -66,12 +66,22 @@ test('Lambda function can be used in a Task with Task Token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included in payLoad', () => { +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in payLoad', () => { expect(() => { new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { - waitForTaskToken: true + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN }) }); }).toThrow(/Task Token is missing in payload/i); +}); + +test('Task throws if SYNC is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC + }) + }); + }).toThrow(/Invalid Service Integration Pattern: SYNC is not supported to call Lambda./i); }); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts index 43dff4d3f54a4..2c8e66ad20b16 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts @@ -79,6 +79,31 @@ test('create basic training job', () => { }); }); +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'TrainSagemaker', { task: new tasks.SagemakerTrainTask(stack, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + trainingJobName: "MyTrainJob", + algorithmSpecification: { + algorithmName: "BlazingText", + }, + inputDataConfig: [ + { + channelName: 'train', + dataSource: { + s3DataSource: { + s3Location: S3Location.fromBucket(s3.Bucket.fromBucketName(stack, 'InputBucket', 'mybucket'), 'mytrainpath') + } + } + } + ], + outputDataConfig: { + s3OutputLocation: S3Location.fromBucket(s3.Bucket.fromBucketName(stack, 'OutputBucket', 'mybucket'), 'myoutputpath') + }, + })}); + }).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call SageMaker./i); + }); + test('create complex training job', () => { // WHEN const kmsKey = new kms.Key(stack, 'Key'); @@ -95,7 +120,7 @@ test('create complex training job', () => { const task = new sfn.Task(stack, 'TrainSagemaker', { task: new tasks.SagemakerTrainTask(stack, { trainingJobName: "MyTrainJob", - synchronous: true, + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, role, algorithmSpecification: { algorithmName: "BlazingText", diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts index 6fd362d026c40..b89f0c90b8152 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts @@ -65,13 +65,33 @@ test('create basic transform job', () => { }); }); +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'TransformTask', { task: new tasks.SagemakerTransformTask(stack, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + transformJobName: "MyTransformJob", + modelName: "MyModelName", + transformInput: { + transformDataSource: { + s3DataSource: { + s3Uri: 's3://inputbucket/prefix', + } + } + }, + transformOutput: { + s3OutputPath: 's3://outputbucket/prefix', + }, + }) }); + }).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call SageMaker./i); + }); + test('create complex transform job', () => { // WHEN const kmsKey = new kms.Key(stack, 'Key'); const task = new sfn.Task(stack, 'TransformTask', { task: new tasks.SagemakerTransformTask(stack, { transformJobName: "MyTransformJob", modelName: "MyModelName", - synchronous: true, + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, role, transformInput: { transformDataSource: { diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts index a13439ed2acc2..323d1da5fe7f9 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts @@ -35,7 +35,7 @@ test('Send message to queue', () => { test('Send message to SQS queue with task token', () => { // WHEN const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, messageBody: sfn.TaskInput.fromObject({ Input: 'Send this message', Token: sfn.Context.taskToken @@ -57,11 +57,11 @@ test('Send message to SQS queue with task token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included in messageBody', () => { +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in messageBody', () => { expect(() => { // WHEN new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, messageBody: sfn.TaskInput.fromText('Send this message') }) }); // THEN @@ -136,4 +136,13 @@ test('Message body object can contain references', () => { } }, }); -}); \ No newline at end of file +}); + +test('Task throws if SYNC is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + messageBody: sfn.TaskInput.fromText('Send this message') + }) }); + }).toThrow(/Invalid Service Integration Pattern: SYNC is not supported to call SQS./i); +}); diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index da0b7e3d6c083..4df85e18b5747 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -122,9 +122,10 @@ done is determine by a class that implements `IStepFunctionsTask`, a collection of which can be found in the `@aws-cdk/aws-stepfunctions-tasks` package. A couple of the tasks available are: -* `tasks.InvokeFunction` -- call a Lambda Function * `tasks.InvokeActivity` -- start an Activity (Activities represent a work queue that you poll on a compute fleet you manage yourself) +* `tasks.InvokeFunction` -- invoke a Lambda function with function ARN +* `tasks.RunLambdaTask` -- call Lambda as integrated service with magic ARN * `tasks.PublishToTopic` -- publish a message to an SNS topic * `tasks.SendToQueue` -- send a message to an SQS queue * `tasks.RunEcsFargateTask`/`ecs.RunEcsEc2Task` -- run a container task, @@ -132,6 +133,11 @@ couple of the tasks available are: * `tasks.SagemakerTrainTask` -- run a SageMaker training job * `tasks.SagemakerTransformTask` -- run a SageMaker transform job +Except `tasks.InvokeActivity` and `tasks.InvokeFunction`, the [service integration +pattern](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html) +(`integrationPattern`) are supposed to be given as parameter when customers want +to call integrated services within a Task state. The default value is `FIRE_AND_FORGET`. + #### Task parameters from the state json Many tasks take parameters. The values for those can either be supplied @@ -142,10 +148,10 @@ such as `Data.stringAt()`. If so, the value is taken from the indicated location in the state JSON, similar to (for example) `inputPath`. -#### Lambda example +#### Lambda example - InvokeFunction ```ts -const task = new sfn.Task(this, 'Invoke The Lambda', { +const task = new sfn.Task(this, 'Invoke1', { task: new tasks.InvokeFunction(myLambda), inputPath: '$.input', timeout: Duration.minutes(5), @@ -164,6 +170,19 @@ task.addCatch(errorHandlerState); task.next(nextState); ``` +#### Lambda example - RunLambdaTask + +```ts + const task = new sfn.Task(stack, 'Invoke2', { + task: new tasks.RunLambdaTask(myLambda, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + payload: { + token: sfn.Context.taskToken + } + }) + }); +``` + #### SNS example ```ts @@ -176,6 +195,7 @@ const topic = new sns.Topic(this, 'Topic'); // Use a field from the execution data as message. const task1 = new sfn.Task(this, 'Publish1', { task: new tasks.PublishToTopic(topic, { + integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, message: TaskInput.fromDataAt('$.state.message'), }) }); diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts index 019004bc36435..fd855043f2859 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts @@ -92,7 +92,7 @@ export class StateMachine extends StateMachineBase { }); this.role = props.role || new iam.Role(this, 'Role', { - assumedBy: new iam.ServicePrincipal(`states.${Stack.of(this).region}.amazonaws.com`), + assumedBy: new iam.ServicePrincipal('states.amazonaws.com'), }); const graph = new StateGraph(props.definition.startState, `State Machine ${id} definition`); diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts b/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts index 3b2a72f672377..7637e8c54952b 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts @@ -78,3 +78,28 @@ export interface StepFunctionsTaskConfig { */ readonly metricDimensions?: cloudwatch.DimensionHash; } + +/** + * Three ways to call an integrated service: Request Response, Run a Job and Wait for a Callback with Task Token. + * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html + * + * Here, they are named as FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN respectly. + * + * @default FIRE_AND_FORGET + */ +export enum ServiceIntegrationPattern { + /** + * Call a service and progress to the next state immediately after the API call completes + */ + FIRE_AND_FORGET = 'FIRE_AND_FORGET', + + /** + * Call a service and wait for a job to complete. + */ + SYNC = 'SYNC', + + /** + * Call a service with a task token and wait until that token is returned by SendTaskSuccess/SendTaskFailure with paylaod + */ + WAIT_FOR_TASK_TOKEN = 'WAIT_FOR_TASK_TOKEN' +} diff --git a/packages/@aws-cdk/aws-stepfunctions/package.json b/packages/@aws-cdk/aws-stepfunctions/package.json index 4b05e33245462..5a18672a36e96 100644 --- a/packages/@aws-cdk/aws-stepfunctions/package.json +++ b/packages/@aws-cdk/aws-stepfunctions/package.json @@ -89,7 +89,8 @@ "import-props-interface:@aws-cdk/aws-stepfunctions.ActivityImportProps", "export:@aws-cdk/aws-stepfunctions.IActivity", "duration-prop-type:@aws-cdk/aws-stepfunctions.WaitProps.duration", - "duration-prop-type:@aws-cdk/aws-stepfunctions.Errors.TIMEOUT" + "duration-prop-type:@aws-cdk/aws-stepfunctions.Errors.TIMEOUT", + "no-unused-type:@aws-cdk/aws-stepfunctions.ServiceIntegrationPattern" ] }, "stability": "experimental"