Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stepfunctions): improve Task payload encoding #2706

Merged
merged 2 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,8 @@ import sfn = require('@aws-cdk/aws-stepfunctions');
export interface PublishToTopicProps {
/**
* The text message to send to the topic.
*
* @default - Exactly one of `message` and `messageObject` is required.
*/
readonly message?: string;

/**
* Object to be JSON-encoded and used as message
*
* @default - Exactly one of `message` and `messageObject` is required.
*/
readonly messageObject?: {[key: string]: any};
readonly message: sfn.TaskInput;

/**
* If true, send a different message to every subscription type
Expand Down Expand Up @@ -46,9 +37,6 @@ export interface PublishToTopicProps {
*/
export class PublishToTopic implements sfn.IStepFunctionsTask {
constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
if ((props.message === undefined) === (props.messageObject === undefined)) {
throw new Error(`Supply exactly one of 'message' or 'messageObject'`);
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties {
Expand All @@ -61,7 +49,7 @@ export class PublishToTopic implements sfn.IStepFunctionsTask {
parameters: {
TopicArn: this.topic.topicArn,
...sfn.FieldUtils.renderObject({
Message: this.props.message || this.props.messageObject,
Message: this.props.message.value,
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
Subject: this.props.subject,
})
Expand Down
18 changes: 3 additions & 15 deletions packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,9 @@ import sfn = require('@aws-cdk/aws-stepfunctions');
*/
export interface SendToQueueProps {
/**
* The text message to send to the topic.
*
* @default - Exactly one of `message` and `messageObject` is required.
*/
readonly message?: string;

/**
* Object to be JSON-encoded and used as message
*
* @default - Exactly one of `message` and `messageObject` is required.
* The text message to send to the queue.
*/
readonly messageObject?: {[key: string]: any};
readonly messageBody: sfn.TaskInput;

/**
* The length of time, in seconds, for which to delay a specific message.
Expand Down Expand Up @@ -55,9 +46,6 @@ export interface SendToQueueProps {
*/
export class SendToQueue implements sfn.IStepFunctionsTask {
constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
if ((props.message === undefined) === (props.messageObject === undefined)) {
throw new Error(`Supply exactly one of 'message' or 'messageObject'`);
}
}

public bind(_task: sfn.Task): sfn.StepFunctionsTaskProperties {
Expand All @@ -70,7 +58,7 @@ export class SendToQueue implements sfn.IStepFunctionsTask {
parameters: {
QueueUrl: this.queue.queueUrl,
...sfn.FieldUtils.renderObject({
MessageBody: this.props.message || this.props.messageObject,
MessageBody: this.props.messageBody.value,
DelaySeconds: this.props.delaySeconds,
MessageDeduplicationId: this.props.messageDeduplicationId,
MessageGroupId: this.props.messageGroupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ test('Running a Fargate Task', () => {
{
containerName: 'TheContainer',
environment: [
{name: 'SOME_KEY', value: sfn.DataField.fromStringAt('$.SomeKey')}
{name: 'SOME_KEY', value: sfn.Data.stringAt('$.SomeKey')}
]
}
]
Expand Down Expand Up @@ -161,7 +161,7 @@ test('Running an EC2 Task with bridge network', () => {
{
containerName: 'TheContainer',
environment: [
{name: 'SOME_KEY', value: sfn.DataField.fromStringAt('$.SomeKey')}
{name: 'SOME_KEY', value: sfn.Data.stringAt('$.SomeKey')}
]
}
]
Expand Down Expand Up @@ -295,9 +295,9 @@ test('Running an EC2 Task with overridden number values', () => {
containerOverrides: [
{
containerName: 'TheContainer',
command: sfn.DataField.fromListAt('$.TheCommand'),
command: sfn.Data.listAt('$.TheCommand'),
cpu: 5,
memoryLimit: sfn.DataField.fromNumberAt('$.MemoryLimit'),
memoryLimit: sfn.Data.numberAt('$.MemoryLimit'),
}
]
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const definition = new sfn.Pass(stack, 'Start', {
environment: [
{
name: 'SOME_KEY',
value: sfn.DataField.fromStringAt('$.SomeKey')
value: sfn.Data.stringAt('$.SomeKey')
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const definition = new sfn.Pass(stack, 'Start', {
environment: [
{
name: 'SOME_KEY',
value: sfn.DataField.fromStringAt('$.SomeKey')
value: sfn.Data.stringAt('$.SomeKey')
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ test('publish to SNS', () => {

// WHEN
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
message: 'Send this message'
message: sfn.TaskInput.fromText('Send this message')
}) });

// THEN
Expand All @@ -32,9 +32,9 @@ test('publish JSON to SNS', () => {

// WHEN
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
messageObject: {
message: sfn.TaskInput.fromObject({
Input: 'Send this message'
}
})
}) });

// THEN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ beforeEach(() => {
test('publish to queue', () => {
// WHEN
const pub = new sfn.Task(stack, 'Send', { task: new tasks.SendToQueue(queue, {
message: 'Send this message',
messageDeduplicationId: sfn.DataField.fromStringAt('$.deduping'),
messageBody: sfn.TaskInput.fromText('Send this message'),
messageDeduplicationId: sfn.Data.stringAt('$.deduping'),
}) });

// THEN
Expand All @@ -36,7 +36,7 @@ test('message body can come from state', () => {
// WHEN
const pub = new sfn.Task(stack, 'Send', {
task: new tasks.SendToQueue(queue, {
message: sfn.DataField.fromStringAt('$.theMessage')
messageBody: sfn.TaskInput.fromDataAt('$.theMessage')
})
});

Expand All @@ -56,10 +56,10 @@ test('message body can be an object', () => {
// WHEN
const pub = new sfn.Task(stack, 'Send', {
task: new tasks.SendToQueue(queue, {
messageObject: {
messageBody: sfn.TaskInput.fromObject({
literal: 'literal',
SomeInput: sfn.DataField.fromStringAt('$.theMessage')
}
SomeInput: sfn.Data.stringAt('$.theMessage')
})
})
});

Expand All @@ -82,9 +82,9 @@ test('message body object can contain references', () => {
// WHEN
const pub = new sfn.Task(stack, 'Send', {
task: new tasks.SendToQueue(queue, {
messageObject: {
messageBody: sfn.TaskInput.fromObject({
queueArn: queue.queueArn
}
})
})
});

Expand Down
46 changes: 37 additions & 9 deletions packages/@aws-cdk/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ couple of the tasks available are:

Many tasks take parameters. The values for those can either be supplied
directly in the workflow definition (by specifying their values), or at
runtime by passing a value obtained from the static functions on `DataField`,
such as `DataField.fromStringAt()`.
runtime by passing a value obtained from the static functions on `Data`,
such as `Data.stringAt()`.

If so, the value is taken from the indicated location in the state JSON,
similar to (for example) `inputPath`.
Expand Down Expand Up @@ -155,9 +155,22 @@ import sns = require('@aws-cdk/aws-sns');
// ...

const topic = new sns.Topic(this, 'Topic');
const task = new sfn.Task(this, 'Publish', {

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Publish1', {
task: new tasks.PublishToTopic(topic, {
message: TaskInput.fromDataAt('$.state.message'),
})
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Publish2', {
task: new tasks.PublishToTopic(topic, {
message: DataField.fromStringAt('$.state.message'),
message: TaskInput.fromObject({
field1: 'somedata',
field2: Data.stringAt('$.field2'),
})
})
});
```
Expand All @@ -170,11 +183,26 @@ import sqs = require('@aws-cdk/aws-sqs');
// ...

const queue = new sns.Queue(this, 'Queue');
const task = new sfn.Task(this, 'Send', {

// Use a field from the execution data as message.
const task1 = new sfn.Task(this, 'Send1', {
task: new tasks.SendToQueue(queue, {
messageBody: TaskInput.fromDataAt('$.message'),
// Only for FIFO queues
messageGroupId: '1234'
})
});

// Combine a field from the execution data with
// a literal object.
const task2 = new sfn.Task(this, 'Send2', {
task: new tasks.SendToQueue(queue, {
messageBody: DataField.fromStringAt('$.message'),
messageBody: TaskInput.fromObject({
field1: 'somedata',
field2: Data.stringAt('$.field2'),
}),
// Only for FIFO queues
messageGroupId: DataField.fromStringAt('$.messageGroupId'),
messageGroupId: '1234'
})
});
```
Expand All @@ -195,7 +223,7 @@ const fargateTask = new ecs.RunEcsFargateTask({
environment: [
{
name: 'CONTAINER_INPUT',
value: DataField.fromStringAt('$.valueFromStateData')
value: Data.stringAt('$.valueFromStateData')
}
]
}
Expand Down Expand Up @@ -464,4 +492,4 @@ Contributions welcome:
- [ ] A single `LambdaTask` class that is both a `Lambda` and a `Task` in one
might make for a nice API.
- [ ] Expression parser for Conditions.
- [ ] Simulate state machines in unit tests.
- [ ] Simulate state machines in unit tests.
20 changes: 10 additions & 10 deletions packages/@aws-cdk/aws-stepfunctions/lib/fields.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@ import { findReferencedPaths, JsonPathToken, renderObject } from "./json-path";
/**
* Extract a field from the State Machine data that gets passed around between states
*/
export class DataField {
export class Data {
/**
* Instead of using a literal string, get the value from a JSON path
*/
public static fromStringAt(path: string): string {
public static stringAt(path: string): string {
validateDataPath(path);
return new JsonPathToken(path).toString();
}

/**
* Instead of using a literal string list, get the value from a JSON path
*/
public static fromListAt(path: string): string[] {
public static listAt(path: string): string[] {
validateDataPath(path);
return new JsonPathToken(path).toList();
}

/**
* Instead of using a literal number, get the value from a JSON path
*/
public static fromNumberAt(path: string): number {
public static numberAt(path: string): number {
validateDataPath(path);
return new JsonPathToken(path).toNumber();
}
Expand All @@ -47,19 +47,19 @@ export class DataField {
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#wait-token-contextobject
*/
export class ContextField {
export class Context {
/**
* Instead of using a literal string, get the value from a JSON path
*/
public static fromStringAt(path: string): string {
public static stringAt(path: string): string {
validateContextPath(path);
return new JsonPathToken(path).toString();
}

/**
* Instead of using a literal number, get the value from a JSON path
*/
public static fromNumberAt(path: string): number {
public static numberAt(path: string): number {
validateContextPath(path);
return new JsonPathToken(path).toNumber();
}
Expand Down Expand Up @@ -89,7 +89,7 @@ export class ContextField {
}
}

/**gg
/**
* Helper functions to work with structures containing fields
*/
export class FieldUtils {
rix0rrr marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -125,12 +125,12 @@ export class FieldUtils {

function validateDataPath(path: string) {
if (!path.startsWith('$.')) {
throw new Error("DataField JSON path values must start with '$.'");
throw new Error("Data JSON path values must start with '$.'");
}
}

function validateContextPath(path: string) {
if (!path.startsWith('$$.')) {
throw new Error("ContextField JSON path values must start with '$$.'");
throw new Error("Context JSON path values must start with '$$.'");
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-stepfunctions/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './fields';
export * from './activity';
export * from './input';
export * from './types';
export * from './condition';
export * from './state-machine';
Expand Down
Loading