Skip to content

Commit

Permalink
WIP Parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Oct 22, 2024
1 parent 6c070d2 commit 9b2258b
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 2 deletions.
32 changes: 32 additions & 0 deletions examples/parallel.asl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Comment": "Parallel Example.",
"StartAt": "FunWithMath",
"States": {
"FunWithMath": {
"Type": "Parallel",
"End": true,
"Branches": [
{
"StartAt": "Add",
"States": {
"Add": {
"Type": "Task",
"Resource": "docker://docker.io/agrare/sleep:latest",
"End": true
}
}
},
{
"StartAt": "Subtract",
"States": {
"Subtract": {
"Type": "Task",
"Resource": "docker://docker.io/agrare/sleep:latest",
"End": true
}
}
}
]
}
}
}
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require_relative "floe/workflow"
# mixins used by workflow components
require_relative "floe/workflow/error_matcher_mixin"
require_relative "floe/workflow/branch"
require_relative "floe/workflow/catcher"
require_relative "floe/workflow/choice_rule"
require_relative "floe/workflow/choice_rule/not"
Expand Down
8 changes: 8 additions & 0 deletions lib/floe/workflow/branch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

module Floe
class Workflow
class Branch < Floe::WorkflowBase
end
end
end
107 changes: 105 additions & 2 deletions lib/floe/workflow/states/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,112 @@ module Floe
class Workflow
module States
class Parallel < Floe::Workflow::State
def initialize(*)
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin

attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path,
:result_selector, :retry, :catch, :branches

def initialize(workflow, name, payload)
super
raise NotImplementedError

missing_field_error!("Branches") if payload["Branches"].nil?

@next = payload["Next"]
@end = !!payload["End"]
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
@catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
@branches = payload["Branches"].map { |branch| Branch.new(branch) }

validate_state!(workflow)
end

def start(context)
super

input = process_input(context)

context.state["BranchContext"] = branches.map { |_branch| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => input.to_json).to_h }
end

def finish(context)
if success?(context)
result = each_branch_context(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

def run_nonblock!(context)
start(context) unless context.state_started?

step_nonblock!(context) while running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def end?
@end
end

def ready?(context)
!context.state_started? || each_branch(context).any? { |branch, ctx| branch.step_nonblock_ready?(ctx) }
end

def wait_until(context)
each_branch(context).any? { |branch, ctx| branch.wait_until(ctx) }.min
end

def waiting?(context)
each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
each_branch_context(context).all?(&:ended?)
end

def success?(context)
each_branch_context(context).none?(&:failed?)
end

private

def step_nonblock!(context)
each_branch(context).each do |branch, ctx|
branch.run_nonblock(ctx) if branch.step_nonblock_ready?(ctx)
end
end

def each_branch(context)
branches.filter_map.with_index do |branch, i|
ctx = context.state.dig("BranchContext", i)
next if ctx.nil?

[branch, Context.new(ctx)]
end
end

def each_branch_context(context)
context.state["BranchContext"].map { |ctx| Context.new(ctx) }
end

def validate_state!(workflow)
validate_state_next!(workflow)
end
end
end
Expand Down
54 changes: 54 additions & 0 deletions spec/workflow/states/parallel_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
RSpec.describe Floe::Workflow::States::Parallel do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) }
let(:state) { workflow.start_workflow.current_state }
let(:workflow) do
payload = {
"FunWithMath" => {
"Type" => "Parallel",
"Branches" => [
{
"StartAt" => "Add",
"States" => {
"Add" => {
"Type" => "Pass",
"End" => true
}
}
},
{
"StartAt" => "Subtract",
"States" => {
"Subtract" => {
"Type" => "Pass",
"End" => true
}
}
}
],
"Next" => "NextState"
},
"NextState" => {"Type" => "Succeed"}
}

make_workflow(ctx, payload)
end

describe "#initialize" do
it "builds the Parallel state object" do
expect { workflow }.not_to raise_error
end

it "raises an InvalidWorkflowError with a missing Branches parameter" do
payload = {
"FunWithMath" => {
"Type" => "Parallel",
"End" => true
}
}

expect { make_workflow(ctx, payload) }
.to raise_error(Floe::InvalidWorkflowError, "States.FunWithMath does not have required field \"Branches\"")
end
end
end

0 comments on commit 9b2258b

Please sign in to comment.