Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Workflow output should be JSON #228

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
module Floe
class Error < StandardError; end
class InvalidWorkflowError < Error; end
class InvalidExecutionInput < Error; end

def self.logger
@logger ||= NullLogger.new
Expand Down
4 changes: 3 additions & 1 deletion lib/floe/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ def run(args = ARGV)
# Display status
workflows.each do |workflow|
puts "", "#{workflow.name}#{" (#{workflow.status})" unless workflow.context.success?}", "===" if workflows.size > 1
puts workflow.output.inspect
puts workflow.output
end

workflows.all? { |workflow| workflow.context.success? }
rescue Floe::Error => err
abort(err.message)
end

private
Expand Down
4 changes: 2 additions & 2 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def initialize(payload, context = nil, credentials = nil, name = nil)

@states = payload["States"].to_a.map { |state_name, state| State.build!(self, state_name, state) }
@states_by_name = @states.each_with_object({}) { |state, result| result[state.name] = state }
rescue Floe::InvalidWorkflowError
rescue Floe::Error
raise
rescue => err
raise Floe::InvalidWorkflowError, err.message
Expand Down Expand Up @@ -157,7 +157,7 @@ def status
end

def output
context.output if end?
context.output.to_json if end?
end

def end?
Expand Down
6 changes: 2 additions & 4 deletions lib/floe/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ class Context
# @param input [Hash] (default: {})
def initialize(context = nil, input: nil, credentials: {})
context = JSON.parse(context) if context.kind_of?(String)

input ||= {}
input = JSON.parse(input) if input.kind_of?(String)
input = JSON.parse(input || "{}")

@context = context || {}
self["Execution"] ||= {}
Expand All @@ -23,7 +21,7 @@ def initialize(context = nil, input: nil, credentials: {})

@credentials = credentials || {}
rescue JSON::ParserError => err
raise Floe::InvalidWorkflowError, err.message
raise Floe::InvalidExecutionInput, "Invalid State Machine Execution Input: #{err}: was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"
end

def execution
Expand Down
14 changes: 7 additions & 7 deletions spec/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('{"foo":1}')
end

it "with a bare workflow and --input" do
Expand All @@ -48,7 +48,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('{"foo":1}')
end

it "with --workflow and no input" do
Expand All @@ -66,7 +66,7 @@

lines = output.lines(:chomp => true)
expect(lines.first).to include("checking 1 workflows...")
expect(lines.last).to eq('{"foo"=>1}')
expect(lines.last).to eq('{"foo":1}')
end

it "with a bare workflow and --workflow" do
Expand Down Expand Up @@ -94,11 +94,11 @@
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo"=>1}
{"foo":1}

workflow
===
{"foo"=>2}
{"foo":2}
OUTPUT
end

Expand All @@ -111,11 +111,11 @@
expect(lines.last(7).join("\n")).to eq(<<~OUTPUT.chomp)
workflow
===
{"foo"=>1}
{"foo":1}

workflow
===
{"foo"=>1}
{"foo":1}
OUTPUT
end

Expand Down
6 changes: 3 additions & 3 deletions spec/workflow/context_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
RSpec.describe Floe::Workflow::Context do
let(:ctx) { described_class.new(:input => input) }
let(:ctx) { described_class.new(:input => input.to_json) }
let(:input) { {"x" => "y"}.freeze }

describe "#new" do
Expand All @@ -9,7 +9,7 @@
end

it "with a context, sets input and keeps context" do
ctx = described_class.new({"Execution" => {"api" => "http://localhost/"}}, :input => input)
ctx = described_class.new({"Execution" => {"api" => "http://localhost/"}}, :input => input.to_json)
expect(ctx.execution["api"]).to eq("http://localhost/")
expect(ctx.state).not_to eq(nil)
end
Expand All @@ -19,7 +19,7 @@
end

context "with a simple string input" do
let(:input) { "\"foo\"" }
let(:input) { "foo" }

it "sets the input" do
expect(ctx.execution["Input"]).to eq("foo")
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/retrier_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::Retrier do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:resource) { "docker://hello-world:latest" }
let(:workflow) do
make_workflow(
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/state_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::State do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
# picked a state that doesn't instantly finish
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/states/choice_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::States::Choice do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
let(:workflow) do
make_workflow(
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/states/fail_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::States::Fail do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
let(:workflow) do
make_workflow(
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/states/pass_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::States::Pass do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
let(:workflow) { make_workflow(ctx, payload) }
let(:payload) do
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/states/succeed_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::States::Succeed do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
let(:payload) { {"SuccessState" => {"Type" => "Succeed"}} }
let(:workflow) { make_workflow(ctx, payload) }
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/states/task_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::States::Task do
let(:input) { {"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:resource) { "docker://hello-world:latest" }

describe "#run_async!" do
Expand Down
2 changes: 1 addition & 1 deletion spec/workflow/states/wait_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RSpec.describe Floe::Workflow::States::Pass do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }

Expand Down
24 changes: 12 additions & 12 deletions spec/workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
RSpec.describe Floe::Workflow do
let(:now) { Time.now.utc }
let(:input) { {"input" => "value"}.freeze }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }

describe "#new" do
it "sets initial state" do
Expand Down Expand Up @@ -57,7 +57,7 @@
it "raises an exception for invalid context" do
payload = make_payload({"FirstState" => {"Type" => "Success"}})

expect { described_class.new(payload, "invalid context") }.to raise_error(Floe::InvalidWorkflowError, "unexpected token at 'invalid context'")
expect { described_class.new(payload, "invalid context") }.to raise_error(Floe::InvalidExecutionInput, "Invalid State Machine Execution Input: unexpected token at 'invalid context': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')")
end

it "raises an exception for invalid resource scheme in a Task state" do
Expand Down Expand Up @@ -88,7 +88,7 @@
expect(ctx.ended?).to eq(true)

# final results
expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
expect(workflow.status).to eq("success")
expect(workflow.end?).to eq(true)
end
Expand All @@ -113,7 +113,7 @@
expect(ctx.ended?).to eq(true)

# final results
expect(workflow.output).to eq({"Cause" => "Bad Stuff", "Error" => "Issue"})
expect(workflow.output).to eq({"Error" => "Issue", "Cause" => "Bad Stuff"}.to_json)
expect(workflow.status).to eq("failure")
expect(workflow.end?).to eq(true)
end
Expand Down Expand Up @@ -148,7 +148,7 @@
workflow.start_workflow
workflow.step_nonblock

expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
expect(workflow.status).to eq("success")
expect(workflow.end?).to eq(true)
expect(ctx.output).to eq(input)
Expand Down Expand Up @@ -187,7 +187,7 @@
# step_nonblock should return 0 and mark the workflow as completed
expect(workflow.step_nonblock).to eq(0)

expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
expect(workflow.status).to eq("success")
expect(workflow.end?).to eq(true)
expect(ctx.output).to eq(input)
Expand Down Expand Up @@ -243,7 +243,7 @@
expect(ctx.running?).to eq(false)
expect(ctx.ended?).to eq(true)

expect(workflow.output).to eq(input)
expect(workflow.output).to eq(input.to_json)
end
end

Expand Down Expand Up @@ -334,25 +334,25 @@

describe ".wait" do
context "with two ready workflows" do
let(:workflow_1) { make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}}) }
let(:workflow_2) { make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}}) }
let(:workflow_1) { make_workflow(ctx, {"FirstState" => {"Type" => "Succeed"}}) }
let(:workflow_2) { make_workflow(ctx, {"FirstState" => {"Type" => "Succeed"}}) }

it "returns both workflows as ready to step" do
expect(described_class.wait([workflow_1, workflow_2], :timeout => 0)).to include(workflow_1, workflow_2)
end
end

context "with one ready workflow and one that would block" do
let(:workflow_1) { make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}}) }
let(:workflow_2) { make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Wait", "Seconds" => 10, "End" => true}}).start_workflow.tap(&:step_nonblock) }
let(:workflow_1) { make_workflow(ctx, {"FirstState" => {"Type" => "Succeed"}}) }
let(:workflow_2) { make_workflow(ctx, {"FirstState" => {"Type" => "Wait", "Seconds" => 10, "End" => true}}).start_workflow.tap(&:step_nonblock) }

it "returns only the first workflow as ready to step" do
expect(described_class.wait([workflow_1, workflow_2], :timeout => 0)).to eq([workflow_1])
end
end

context "with a workflow that would block for 10 seconds" do
let(:workflow) { make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Wait", "Seconds" => 10, "End" => true}}).start_workflow.tap(&:step_nonblock) }
let(:workflow) { make_workflow(ctx, {"FirstState" => {"Type" => "Wait", "Seconds" => 10, "End" => true}}).start_workflow.tap(&:step_nonblock) }

it "returns no ready workflows with :timeout => 0" do
expect(described_class.wait(workflow, :timeout => 0)).to be_empty
Expand Down