Skip to content

Commit

Permalink
State-Machine and State Input/Output all JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Jun 21, 2024
1 parent 2d71d49 commit 79657b7
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 84 deletions.
4 changes: 2 additions & 2 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
require_relative "floe/workflow/reference_path"
require_relative "floe/workflow/retrier"
require_relative "floe/workflow/state"
require_relative "floe/workflow/states/input_output_mixin"
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/choice"
require_relative "floe/workflow/states/fail"
require_relative "floe/workflow/states/input_output_mixin"
require_relative "floe/workflow/states/map"
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/parallel"
require_relative "floe/workflow/states/pass"
require_relative "floe/workflow/states/succeed"
Expand Down
11 changes: 9 additions & 2 deletions lib/floe/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ class Context
# @param input [Hash] (default: {})
def initialize(context = nil, input: nil, credentials: {})
context = JSON.parse(context) if context.kind_of?(String)
input = JSON.parse(input || "{}")
input ||= "{}"

# Validate that input is valid json
JSON.parse(input)

@context = context || {}
self["Execution"] ||= {}
Expand Down Expand Up @@ -37,7 +40,7 @@ def running?
end

def failed?
(output.kind_of?(Hash) && output.key?("Error")) || false
(parsed_output.kind_of?(Hash) && parsed_output.key?("Error")) || false
end

def ended?
Expand All @@ -56,6 +59,10 @@ def output
state["Output"]
end

def parsed_output
output.kind_of?(String) ? JSON.parse(output) : output
end

def output=(val)
state["Output"] = val
end
Expand Down
5 changes: 3 additions & 2 deletions lib/floe/workflow/states/choice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Floe
class Workflow
module States
class Choice < Floe::Workflow::State
include InputOutputMixin

attr_reader :choices, :default, :input_path, :output_path

def initialize(workflow, name, payload)
Expand All @@ -19,8 +21,7 @@ def initialize(workflow, name, payload)
end

def finish(context)
input = input_path.value(context, context.input)
output = output_path.value(context, input)
output = process_output(context, process_input(context))
next_state = choices.detect { |choice| choice.true?(context, output) }&.next || default

context.next_state = next_state
Expand Down
2 changes: 1 addition & 1 deletion lib/floe/workflow/states/fail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def finish(context)
context.output = {
"Error" => @error_path ? @error_path.value(context, context.input) : error,
"Cause" => @cause_path ? @cause_path.value(context, context.input) : cause
}.compact
}.compact.to_json
super
end

Expand Down
24 changes: 15 additions & 9 deletions lib/floe/workflow/states/input_output_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,31 @@ class Workflow
module States
module InputOutputMixin
def process_input(context)
input = input_path.value(context, context.input)
input = parameters.value(context, input) if parameters
input = JSON.parse(context.input)
input = input_path.value(context, input)
input = parameters.value(context, input) if @parameters
input
end

def process_output(context, results)
return context.input.dup if results.nil?
return context.input if results.nil?
return if output_path.nil?

input = JSON.parse(context.input)
results = result_selector.value(context, results) if @result_selector
if result_path.payload.start_with?("$.Credentials")
credentials = result_path.set(context.credentials, results)["Credentials"]
context.credentials.merge!(credentials)
output = context.input.dup
if @result_path
if result_path.payload.start_with?("$.Credentials")
credentials = result_path.set(context.credentials, results)["Credentials"]
context.credentials.merge!(credentials)
output = input
else
output = result_path.set(input, results)
end
else
output = result_path.set(context.input.dup, results)
output = input
end

output_path.value(context, output)
output_path.value(context, output).to_json
end
end
end
Expand Down
5 changes: 3 additions & 2 deletions lib/floe/workflow/states/succeed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ module Floe
class Workflow
module States
class Succeed < Floe::Workflow::State
include InputOutputMixin

attr_reader :input_path, :output_path

def initialize(workflow, name, payload)
Expand All @@ -14,8 +16,7 @@ def initialize(workflow, name, payload)
end

def finish(context)
input = input_path.value(context, context.input)
context.output = output_path.value(context, input)
context.output = process_output(context, process_input(context))
context.next_state = nil

super
Expand Down
6 changes: 3 additions & 3 deletions lib/floe/workflow/states/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def retry_state!(context, error)

wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
context.output = error.to_json
logger.info("Running state: [#{long_name}] with input [#{context.input}] got error[#{context.output}]...Retry - delay: #{wait_until(context)}")
true
end
Expand All @@ -115,7 +115,7 @@ def catch_error!(context, error)
return if catcher.nil?

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
context.output = catcher.result_path.set(JSON.parse(context.input), error).to_json
logger.info("Running state: [#{long_name}] with input [#{context.input}]...CatchError - next state: [#{context.next_state}] output: [#{context.output}]")

true
Expand All @@ -125,7 +125,7 @@ def fail_workflow!(context, error)
# next_state is nil, and will be set to nil again in super
# keeping in here for completeness
context.next_state = nil
context.output = error
context.output = error.to_json
logger.error("Running state: [#{long_name}] with input [#{context.input}]...Complete workflow - output: [#{context.output}]")
end

Expand Down
4 changes: 2 additions & 2 deletions lib/floe/workflow/states/wait.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Floe
class Workflow
module States
class Wait < Floe::Workflow::State
include InputOutputMixin
include NonTerminalMixin

attr_reader :end, :input_path, :next, :seconds, :seconds_path, :timestamp, :timestamp_path, :output_path
Expand Down Expand Up @@ -39,8 +40,7 @@ def start(context)
end

def finish(context)
input = input_path.value(context, context.input)
context.output = output_path.value(context, input)
context.output = process_output(context, process_input(context))
super
end

Expand Down
18 changes: 9 additions & 9 deletions spec/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq("{}")
expect(lines.last).to eq("\"{}\"")
end

it "with a bare workflow and input" do
Expand All @@ -39,7 +39,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('"{\"foo\":1}"')
end

it "with a bare workflow and --input" do
Expand All @@ -48,7 +48,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('"{\"foo\":1}"')
end

it "with --workflow and no input" do
Expand All @@ -57,7 +57,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq("{}")
expect(lines.last).to eq("\"{}\"")
end

it "with --workflow and --input" do
Expand All @@ -66,7 +66,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('"{\"foo\":1}"')
end

it "with a bare workflow and --workflow" do
Expand Down Expand Up @@ -94,11 +94,11 @@
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo"=>1}
"{\\"foo\\":1}"
workflow
===
{"foo"=>2}
"{\\"foo\\":2}"
OUTPUT
end

Expand All @@ -111,11 +111,11 @@
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo"=>1}
"{\\"foo\\":1}"
workflow
===
{"foo"=>1}
"{\\"foo\\":1}"
OUTPUT
end

Expand Down
4 changes: 2 additions & 2 deletions spec/workflow/context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

describe "#new" do
it "with an empty context, sets input" do
expect(ctx.execution["Input"]).to eq(input)
expect(ctx.execution["Input"]).to eq("{\"x\":\"y\"}")
expect(ctx.state).not_to eq(nil)
end

Expand All @@ -22,7 +22,7 @@
let(:input) { "foo" }

it "sets the input" do
expect(ctx.execution["Input"]).to eq("foo")
expect(ctx.execution["Input"]).to eq("\"foo\"")
expect(ctx.state).not_to eq(nil)
end
end
Expand Down
4 changes: 2 additions & 2 deletions spec/workflow/states/fail_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
it "populates static values" do
state.run_nonblock!(ctx)
expect(ctx.next_state).to eq(nil)
expect(ctx.output).to eq("Error" => "FailStateError", "Cause" => "No Matches!")
expect(ctx.output).to eq({"Error" => "FailStateError", "Cause" => "No Matches!"}.to_json)
end

context "with dynamic error text" do
Expand All @@ -42,7 +42,7 @@
it "populates dynamic values" do
state.run_nonblock!(ctx)
expect(ctx.next_state).to eq(nil)
expect(ctx.output).to eq("Error" => "DynamicError", "Cause" => "DynamicCause")
expect(ctx.output).to eq({"Error" => "DynamicError", "Cause" => "DynamicCause"}.to_json)
end
end
end
Expand Down
18 changes: 10 additions & 8 deletions spec/workflow/states/pass_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
describe "#run_nonblock!" do
it "sets the result to the result path" do
state.run_nonblock!(ctx)
expect(ctx.output["result"]).to include({"foo" => "bar", "bar" => "baz"})
expect(ctx.output).to eq({"result" => {"foo" => "bar", "bar" => "baz"}}.to_json)
expect(ctx.next_state).to eq("SuccessState")
end

Expand Down Expand Up @@ -83,9 +83,11 @@
it "Uses raw input" do
workflow.run_nonblock
expect(ctx.output).to eq(
"title" => "Numbers to add",
"numbers" => {"val1" => 3, "val2" => 4},
"sum" => 7
{
"title" => "Numbers to add",
"numbers" => {"val1" => 3, "val2" => 4},
"sum" => 7
}.to_json
)
end
end
Expand All @@ -98,7 +100,7 @@

it "passes output through to input" do
workflow.run_nonblock
expect(ctx.output).to eq(input)
expect(ctx.output).to eq("{\"color\":\"red\"}")
end

context "with InputPath" do
Expand All @@ -108,7 +110,7 @@

it "Uses InputPath to select color" do
workflow.run_nonblock
expect(ctx.output).to eq("red")
expect(ctx.output).to eq("\"red\"")
end
end

Expand All @@ -118,7 +120,7 @@

it "Uses OutputPath to drop other keys" do
workflow.run_nonblock
expect(ctx.output).to eq("red")
expect(ctx.output).to eq("\"red\"")
end
end

Expand All @@ -128,7 +130,7 @@

it "Uses OutputPath to drop other keys" do
workflow.run_nonblock
expect(ctx.output).to eq("red")
expect(ctx.output).to eq("\"red\"")
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/workflow/states/succeed_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

it "sets output to input" do
state.run_nonblock!(ctx)
expect(ctx.output).to eq(input)
expect(ctx.output).to eq("{\"color\":\"red\"}")
end

context "with InputPath" do
let(:payload) { {"SuccessState" => {"Type" => "Succeed", "InputPath" => "$.color"}} }

it "sets the output to the selected input path" do
state.run_nonblock!(ctx)
expect(ctx.output).to eq(input["color"])
expect(ctx.output).to eq("{\"color\":\"red\"}")
end
end

Expand All @@ -38,7 +38,7 @@

it "sets the output to the selected input path" do
state.run_nonblock!(ctx)
expect(ctx.output).to eq(input["color"])
expect(ctx.output).to eq("\"red\"")
end
end
end
Expand Down
Loading

0 comments on commit 79657b7

Please sign in to comment.