diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts index 011fe6d1c8e4e..c4aa62319df8c 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts @@ -1,6 +1,7 @@ import iam = require('@aws-cdk/aws-iam'); import sns = require('@aws-cdk/aws-sns'); import sfn = require('@aws-cdk/aws-stepfunctions'); +import { resourceArnSuffix } from './resource-arn-suffix'; /** * Properties for PublishTask @@ -29,11 +30,13 @@ export interface PublishToTopicProps { readonly subject?: string; /** - * Whether to pause the workflow until a task token is returned + * The service integration pattern indicates different ways to call Publish to SNS. * - * @default false + * The valid value is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN. + * + * @default FIRE_AND_FORGET */ - readonly waitForTaskToken?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; } /** @@ -44,19 +47,30 @@ export interface PublishToTopicProps { */ export class PublishToTopic implements sfn.IStepFunctionsTask { - private readonly waitForTaskToken: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) { - this.waitForTaskToken = props.waitForTaskToken === true; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + ]; - if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) { - throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)'); + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SNS.`); + } + + if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN) { + if (!sfn.FieldUtils.containsTaskToken(props.message)) { + throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)'); + } } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), + resourceArn: 'arn:aws:states:::sns:publish' + resourceArnSuffix.get(this.integrationPattern), policyStatements: [new iam.PolicyStatement({ actions: ['sns:Publish'], resources: [this.topic.topicArn] diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts new file mode 100644 index 0000000000000..be7d81e8fef78 --- /dev/null +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/resource-arn-suffix.ts @@ -0,0 +1,15 @@ +import sfn = require('@aws-cdk/aws-stepfunctions'); + +/** + * Suffixes corresponding to different service integration patterns + * + * Key is the service integration pattern, value is the resource ARN suffix. + * + * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html + */ +const resourceArnSuffix = new Map(); +resourceArnSuffix.set(sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, ""); +resourceArnSuffix.set(sfn.ServiceIntegrationPattern.SYNC, ".sync"); +resourceArnSuffix.set(sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, ".waitForTaskToken"); + +export { resourceArnSuffix }; \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts index ca864d2d7cf91..379bd8acfc9da 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts @@ -4,6 +4,7 @@ import iam = require('@aws-cdk/aws-iam'); import sfn = require('@aws-cdk/aws-stepfunctions'); import cdk = require('@aws-cdk/core'); import { Stack } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; import { ContainerOverride } from './run-ecs-task-base-types'; /** @@ -32,11 +33,13 @@ export interface CommonEcsRunTaskProps { readonly containerOverrides?: ContainerOverride[]; /** - * Whether to wait for the task to complete and return the response + * The service integration pattern indicates different ways to call RunTask in ECS. * - * @default true + * The valid value for Lambda is FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN. + * + * @default FIRE_AND_FORGET */ - readonly synchronous?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; } /** @@ -60,10 +63,25 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask private securityGroup?: ec2.ISecurityGroup; private networkConfiguration?: any; - private readonly sync: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly props: EcsRunTaskBaseProps) { - this.sync = props.synchronous !== false; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call ECS.`); + } + + if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + && !sfn.FieldUtils.containsTaskToken(props.containerOverrides)) { + throw new Error('Task Token is missing in containerOverrides (pass Context.taskToken somewhere in containerOverrides)'); + } for (const override of this.props.containerOverrides || []) { const name = override.containerName; @@ -86,7 +104,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask } return { - resourceArn: 'arn:aws:states:::ecs:runTask' + (this.sync ? '.sync' : ''), + resourceArn: 'arn:aws:states:::ecs:runTask' + resourceArnSuffix.get(this.integrationPattern), parameters: { Cluster: this.props.cluster.clusterArn, TaskDefinition: this.props.taskDefinition.taskDefinitionArn, @@ -139,7 +157,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask }), ]; - if (this.sync) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { policyStatements.push(new iam.PolicyStatement({ actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"], resources: [stack.formatArn({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts index e6330e634230c..5d58164a641d8 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts @@ -1,7 +1,7 @@ import iam = require('@aws-cdk/aws-iam'); import lambda = require('@aws-cdk/aws-lambda'); import sfn = require('@aws-cdk/aws-stepfunctions'); -import { FieldUtils } from '../../aws-stepfunctions/lib/fields'; +import { resourceArnSuffix } from './resource-arn-suffix'; /** * Properties for RunLambdaTask @@ -13,15 +13,18 @@ export interface RunLambdaTaskProps { readonly payload?: { [key: string]: any }; /** - * Whether to pause the workflow until a task token is returned + * The service integration pattern indicates different ways to invoke Lambda function. * - * If this is set to true, the Context.taskToken value must be included + * The valid value for Lambda is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN, + * it determines whether to pause the workflow until a task token is returned. + * + * If this is set to WAIT_FOR_TASK_TOKEN, the Context.taskToken value must be included * somewhere in the payload and the Lambda must call * `SendTaskSuccess/SendTaskFailure` using that token. * - * @default false + * @default FIRE_AND_FORGET */ - readonly waitForTaskToken?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** * Invocation type of the Lambda function @@ -55,18 +58,28 @@ export interface RunLambdaTaskProps { * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html */ export class RunLambdaTask implements sfn.IStepFunctionsTask { - private readonly waitForTaskToken: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) { - this.waitForTaskToken = !!props.waitForTaskToken; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call Lambda.`); + } - if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + && !sfn.FieldUtils.containsTaskToken(props.payload)) { throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)'); } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { - const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : ''); + const resourceArn = 'arn:aws:states:::lambda:invoke' + resourceArnSuffix.get(this.integrationPattern); return { resourceArn, @@ -106,4 +119,9 @@ export enum InvocationType { * The API response only includes a status code. */ EVENT = 'Event', + + /** + * TValidate parameter values and verify that the user or role has permission to invoke the function. + */ + DRY_RUN = 'DryRun' } \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts index 61807c1adcab5..f4f062e55db29 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts @@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2'); import iam = require('@aws-cdk/aws-iam'); import sfn = require('@aws-cdk/aws-stepfunctions'); import { Construct, Duration, Stack } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; import { AlgorithmSpecification, Channel, InputMode, OutputDataConfig, ResourceConfig, S3DataType, StoppingCondition, VpcConfig, } from './sagemaker-task-base-types'; @@ -26,11 +27,13 @@ export interface SagemakerTrainTaskProps { readonly role?: iam.IRole; /** - * Specify if the task is synchronous or asychronous. + * The service integration pattern indicates different ways to call SageMaker APIs. * - * @default false + * The valid value is either FIRE_AND_FORGET or SYNC. + * + * @default FIRE_AND_FORGET */ - readonly synchronous?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** * Identifies the training algorithm to use. @@ -114,7 +117,19 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn */ private readonly stoppingCondition: StoppingCondition; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; + constructor(scope: Construct, private readonly props: SagemakerTrainTaskProps) { + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`); + } // set the default resource config if not defined. this.resourceConfig = props.resourceConfig || { @@ -194,7 +209,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''), + resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + resourceArnSuffix.get(this.integrationPattern), parameters: this.renderParameters(), policyStatements: this.makePolicyStatements(task), }; @@ -322,7 +337,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn }) ]; - if (this.props.synchronous) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { policyStatements.push(new iam.PolicyStatement({ actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"], resources: [stack.formatArn({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts index ad3663d2687c5..eeabb1db2984c 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts @@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2'); import iam = require('@aws-cdk/aws-iam'); import sfn = require('@aws-cdk/aws-stepfunctions'); import { Construct, Stack } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; import { BatchStrategy, S3DataType, TransformInput, TransformOutput, TransformResources } from './sagemaker-task-base-types'; /** @@ -20,9 +21,13 @@ export interface SagemakerTransformProps { readonly role?: iam.IRole; /** - * Specify if the task is synchronous or asychronous. + * The service integration pattern indicates different ways to call SageMaker APIs. + * + * The valid value is either FIRE_AND_FORGET or SYNC. + * + * @default FIRE_AND_FORGET */ - readonly synchronous?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; /** * Number of records to include in a mini-batch for an HTTP inference request. @@ -94,7 +99,19 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { */ private readonly transformResources: TransformResources; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; + constructor(scope: Construct, private readonly props: SagemakerTransformProps) { + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.SYNC + ]; + + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`); + } // set the sagemaker role or create new one this.role = props.role || new iam.Role(scope, 'SagemakerRole', { @@ -124,7 +141,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''), + resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + resourceArnSuffix.get(this.integrationPattern), parameters: this.renderParameters(), policyStatements: this.makePolicyStatements(task), }; @@ -216,7 +233,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask { }) ]; - if (this.props.synchronous) { + if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) { policyStatements.push(new iam.PolicyStatement({ actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"], resources: [stack.formatArn({ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts index 015a8b783aecb..401515725ec66 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts @@ -2,6 +2,7 @@ import iam = require('@aws-cdk/aws-iam'); import sqs = require('@aws-cdk/aws-sqs'); import sfn = require('@aws-cdk/aws-stepfunctions'); import { Duration } from '@aws-cdk/core'; +import { resourceArnSuffix } from './resource-arn-suffix'; /** * Properties for SendMessageTask @@ -39,11 +40,13 @@ export interface SendToQueueProps { readonly messageGroupId?: string; /** - * Whether to pause the workflow until a task token is returned + * The service integration pattern indicates different ways to call SendMessage to SQS. * - * @default false + * The valid value is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN. + * + * @default FIRE_AND_FORGET */ - readonly waitForTaskToken?: boolean; + readonly integrationPattern?: sfn.ServiceIntegrationPattern; } /** @@ -54,19 +57,30 @@ export interface SendToQueueProps { */ export class SendToQueue implements sfn.IStepFunctionsTask { - private readonly waitForTaskToken: boolean; + private readonly integrationPattern: sfn.ServiceIntegrationPattern; constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) { - this.waitForTaskToken = props.waitForTaskToken === true; + this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET; + + const supportedPatterns = [ + sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, + sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN + ]; - if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.messageBody.value)) { - throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)'); + if (!supportedPatterns.includes(this.integrationPattern)) { + throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SQS.`); + } + + if (props.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN) { + if (!sfn.FieldUtils.containsTaskToken(props.messageBody)) { + throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)'); + } } } public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig { return { - resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''), + resourceArn: 'arn:aws:states:::sqs:sendMessage' + resourceArnSuffix.get(this.integrationPattern), policyStatements: [new iam.PolicyStatement({ actions: ['sqs:SendMessage'], resources: [this.queue.queueArn] diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts index d6efeb24c59c6..f3a8669b915bb 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/ecs-tasks.test.ts @@ -57,6 +57,7 @@ test('Running a Fargate Task', () => { // WHEN const runTask = new sfn.Task(stack, 'RunFargate', { task: new tasks.RunEcsFargateTask({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, containerOverrides: [ @@ -154,6 +155,7 @@ test('Running an EC2 Task with bridge network', () => { // WHEN const runTask = new sfn.Task(stack, 'Run', { task: new tasks.RunEcsEc2Task({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, containerOverrides: [ @@ -241,6 +243,7 @@ test('Running an EC2 Task with placement strategies', () => { }); const ec2Task = new tasks.RunEcsEc2Task({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, placementStrategies: [ @@ -289,6 +292,7 @@ test('Running an EC2 Task with overridden number values', () => { }); const ec2Task = new tasks.RunEcsEc2Task({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, containerOverrides: [ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts index b512fd605c302..b18ed06e60b56 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.ec2-task.ts @@ -35,7 +35,9 @@ taskDefinition.addContainer('TheContainer', { const definition = new sfn.Pass(stack, 'Start', { result: sfn.Result.fromObject({ SomeKey: 'SomeValue' }) }).next(new sfn.Task(stack, 'Run', { task: new tasks.RunEcsEc2Task({ - cluster, taskDefinition, + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + cluster, + taskDefinition, containerOverrides: [ { containerName: 'TheContainer', diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts index 01501a7f3d5e4..29fde815aa1bb 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.fargate-task.ts @@ -34,6 +34,7 @@ taskDefinition.addContainer('TheContainer', { const definition = new sfn.Pass(stack, 'Start', { result: sfn.Result.fromObject({ SomeKey: 'SomeValue' }) }).next(new sfn.Task(stack, 'FargateTask', { task: new tasks.RunEcsFargateTask({ + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, cluster, taskDefinition, assignPublicIp: true, containerOverrides: [ diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts index 6d6d69dd892c1..136764cfd9dc5 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.ts @@ -25,7 +25,7 @@ const callbackHandler = new Function(stack, 'CallbackHandler', { const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', { task: new tasks.RunLambdaTask(callbackHandler, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, payload: { token: sfn.Context.taskToken } diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts index 6108f341bdd24..18a0bf421ff28 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts @@ -32,7 +32,7 @@ test('Publish JSON to SNS topic with task token', () => { // WHEN const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, message: sfn.TaskInput.fromObject({ Input: 'Publish this message', Token: sfn.Context.taskToken @@ -54,14 +54,14 @@ test('Publish JSON to SNS topic with task token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included in message', () => { +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in message', () => { expect(() => { // GIVEN const stack = new cdk.Stack(); const topic = new sns.Topic(stack, 'Topic'); // WHEN new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, message: sfn.TaskInput.fromText('Publish this message') }) }); // THEN @@ -89,3 +89,15 @@ test('Publish to topic with ARN from payload', () => { }, }); }); + +test('Task throws if SYNC is supplied as service integration pattern', () => { + expect(() => { + const stack = new cdk.Stack(); + const topic = new sns.Topic(stack, 'Topic'); + + new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + message: sfn.TaskInput.fromText('Publish this message') + }) }); + }).toThrow(/Invalid Service Integration Pattern: SYNC is not supported to call SNS./i); +}); diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts index bf49993b28131..1a5d997551bf7 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/run-lambda-task.test.ts @@ -45,7 +45,7 @@ test('Invoke lambda with default magic ARN', () => { test('Lambda function can be used in a Task with Task Token', () => { const task = new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, payload: { token: sfn.Context.taskToken } @@ -66,12 +66,22 @@ test('Lambda function can be used in a Task with Task Token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included in payLoad', () => { +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in payLoad', () => { expect(() => { new sfn.Task(stack, 'Task', { task: new tasks.RunLambdaTask(fn, { - waitForTaskToken: true + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN }) }); }).toThrow(/Task Token is missing in payload/i); +}); + +test('Task throws if SYNC is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'Task', { + task: new tasks.RunLambdaTask(fn, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC + }) + }); + }).toThrow(/Invalid Service Integration Pattern: SYNC is not supported to call Lambda./i); }); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts index 43dff4d3f54a4..2c8e66ad20b16 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-training-job.test.ts @@ -79,6 +79,31 @@ test('create basic training job', () => { }); }); +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'TrainSagemaker', { task: new tasks.SagemakerTrainTask(stack, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + trainingJobName: "MyTrainJob", + algorithmSpecification: { + algorithmName: "BlazingText", + }, + inputDataConfig: [ + { + channelName: 'train', + dataSource: { + s3DataSource: { + s3Location: S3Location.fromBucket(s3.Bucket.fromBucketName(stack, 'InputBucket', 'mybucket'), 'mytrainpath') + } + } + } + ], + outputDataConfig: { + s3OutputLocation: S3Location.fromBucket(s3.Bucket.fromBucketName(stack, 'OutputBucket', 'mybucket'), 'myoutputpath') + }, + })}); + }).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call SageMaker./i); + }); + test('create complex training job', () => { // WHEN const kmsKey = new kms.Key(stack, 'Key'); @@ -95,7 +120,7 @@ test('create complex training job', () => { const task = new sfn.Task(stack, 'TrainSagemaker', { task: new tasks.SagemakerTrainTask(stack, { trainingJobName: "MyTrainJob", - synchronous: true, + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, role, algorithmSpecification: { algorithmName: "BlazingText", diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts index 6fd362d026c40..b89f0c90b8152 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/sagemaker-transform-job.test.ts @@ -65,13 +65,33 @@ test('create basic transform job', () => { }); }); +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'TransformTask', { task: new tasks.SagemakerTransformTask(stack, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + transformJobName: "MyTransformJob", + modelName: "MyModelName", + transformInput: { + transformDataSource: { + s3DataSource: { + s3Uri: 's3://inputbucket/prefix', + } + } + }, + transformOutput: { + s3OutputPath: 's3://outputbucket/prefix', + }, + }) }); + }).toThrow(/Invalid Service Integration Pattern: WAIT_FOR_TASK_TOKEN is not supported to call SageMaker./i); + }); + test('create complex transform job', () => { // WHEN const kmsKey = new kms.Key(stack, 'Key'); const task = new sfn.Task(stack, 'TransformTask', { task: new tasks.SagemakerTransformTask(stack, { transformJobName: "MyTransformJob", modelName: "MyModelName", - synchronous: true, + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, role, transformInput: { transformDataSource: { diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts index a13439ed2acc2..323d1da5fe7f9 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/send-to-queue.test.ts @@ -35,7 +35,7 @@ test('Send message to queue', () => { test('Send message to SQS queue with task token', () => { // WHEN const task = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, messageBody: sfn.TaskInput.fromObject({ Input: 'Send this message', Token: sfn.Context.taskToken @@ -57,11 +57,11 @@ test('Send message to SQS queue with task token', () => { }); }); -test('Task throws if waitForTaskToken is supplied but task token is not included in messageBody', () => { +test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in messageBody', () => { expect(() => { // WHEN new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { - waitForTaskToken: true, + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, messageBody: sfn.TaskInput.fromText('Send this message') }) }); // THEN @@ -136,4 +136,13 @@ test('Message body object can contain references', () => { } }, }); -}); \ No newline at end of file +}); + +test('Task throws if SYNC is supplied as service integration pattern', () => { + expect(() => { + new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, { + integrationPattern: sfn.ServiceIntegrationPattern.SYNC, + messageBody: sfn.TaskInput.fromText('Send this message') + }) }); + }).toThrow(/Invalid Service Integration Pattern: SYNC is not supported to call SQS./i); +}); diff --git a/packages/@aws-cdk/aws-stepfunctions/README.md b/packages/@aws-cdk/aws-stepfunctions/README.md index da0b7e3d6c083..4df85e18b5747 100644 --- a/packages/@aws-cdk/aws-stepfunctions/README.md +++ b/packages/@aws-cdk/aws-stepfunctions/README.md @@ -122,9 +122,10 @@ done is determine by a class that implements `IStepFunctionsTask`, a collection of which can be found in the `@aws-cdk/aws-stepfunctions-tasks` package. A couple of the tasks available are: -* `tasks.InvokeFunction` -- call a Lambda Function * `tasks.InvokeActivity` -- start an Activity (Activities represent a work queue that you poll on a compute fleet you manage yourself) +* `tasks.InvokeFunction` -- invoke a Lambda function with function ARN +* `tasks.RunLambdaTask` -- call Lambda as integrated service with magic ARN * `tasks.PublishToTopic` -- publish a message to an SNS topic * `tasks.SendToQueue` -- send a message to an SQS queue * `tasks.RunEcsFargateTask`/`ecs.RunEcsEc2Task` -- run a container task, @@ -132,6 +133,11 @@ couple of the tasks available are: * `tasks.SagemakerTrainTask` -- run a SageMaker training job * `tasks.SagemakerTransformTask` -- run a SageMaker transform job +Except `tasks.InvokeActivity` and `tasks.InvokeFunction`, the [service integration +pattern](https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html) +(`integrationPattern`) are supposed to be given as parameter when customers want +to call integrated services within a Task state. The default value is `FIRE_AND_FORGET`. + #### Task parameters from the state json Many tasks take parameters. The values for those can either be supplied @@ -142,10 +148,10 @@ such as `Data.stringAt()`. If so, the value is taken from the indicated location in the state JSON, similar to (for example) `inputPath`. -#### Lambda example +#### Lambda example - InvokeFunction ```ts -const task = new sfn.Task(this, 'Invoke The Lambda', { +const task = new sfn.Task(this, 'Invoke1', { task: new tasks.InvokeFunction(myLambda), inputPath: '$.input', timeout: Duration.minutes(5), @@ -164,6 +170,19 @@ task.addCatch(errorHandlerState); task.next(nextState); ``` +#### Lambda example - RunLambdaTask + +```ts + const task = new sfn.Task(stack, 'Invoke2', { + task: new tasks.RunLambdaTask(myLambda, { + integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, + payload: { + token: sfn.Context.taskToken + } + }) + }); +``` + #### SNS example ```ts @@ -176,6 +195,7 @@ const topic = new sns.Topic(this, 'Topic'); // Use a field from the execution data as message. const task1 = new sfn.Task(this, 'Publish1', { task: new tasks.PublishToTopic(topic, { + integrationPattern: sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, message: TaskInput.fromDataAt('$.state.message'), }) }); diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts index 019004bc36435..fd855043f2859 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/state-machine.ts @@ -92,7 +92,7 @@ export class StateMachine extends StateMachineBase { }); this.role = props.role || new iam.Role(this, 'Role', { - assumedBy: new iam.ServicePrincipal(`states.${Stack.of(this).region}.amazonaws.com`), + assumedBy: new iam.ServicePrincipal('states.amazonaws.com'), }); const graph = new StateGraph(props.definition.startState, `State Machine ${id} definition`); diff --git a/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts b/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts index 3b2a72f672377..7637e8c54952b 100644 --- a/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts +++ b/packages/@aws-cdk/aws-stepfunctions/lib/step-functions-task.ts @@ -78,3 +78,28 @@ export interface StepFunctionsTaskConfig { */ readonly metricDimensions?: cloudwatch.DimensionHash; } + +/** + * Three ways to call an integrated service: Request Response, Run a Job and Wait for a Callback with Task Token. + * @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html + * + * Here, they are named as FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN respectly. + * + * @default FIRE_AND_FORGET + */ +export enum ServiceIntegrationPattern { + /** + * Call a service and progress to the next state immediately after the API call completes + */ + FIRE_AND_FORGET = 'FIRE_AND_FORGET', + + /** + * Call a service and wait for a job to complete. + */ + SYNC = 'SYNC', + + /** + * Call a service with a task token and wait until that token is returned by SendTaskSuccess/SendTaskFailure with paylaod + */ + WAIT_FOR_TASK_TOKEN = 'WAIT_FOR_TASK_TOKEN' +} diff --git a/packages/@aws-cdk/aws-stepfunctions/package.json b/packages/@aws-cdk/aws-stepfunctions/package.json index 4b05e33245462..5a18672a36e96 100644 --- a/packages/@aws-cdk/aws-stepfunctions/package.json +++ b/packages/@aws-cdk/aws-stepfunctions/package.json @@ -89,7 +89,8 @@ "import-props-interface:@aws-cdk/aws-stepfunctions.ActivityImportProps", "export:@aws-cdk/aws-stepfunctions.IActivity", "duration-prop-type:@aws-cdk/aws-stepfunctions.WaitProps.duration", - "duration-prop-type:@aws-cdk/aws-stepfunctions.Errors.TIMEOUT" + "duration-prop-type:@aws-cdk/aws-stepfunctions.Errors.TIMEOUT", + "no-unused-type:@aws-cdk/aws-stepfunctions.ServiceIntegrationPattern" ] }, "stability": "experimental"