From 9b2258bfb87a948ad3efc0950fb5089338f1f66e Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 8 Oct 2024 15:07:11 -0400 Subject: [PATCH] WIP Parallel --- examples/parallel.asl | 32 ++++++++ lib/floe.rb | 1 + lib/floe/workflow/branch.rb | 8 ++ lib/floe/workflow/states/parallel.rb | 107 +++++++++++++++++++++++++- spec/workflow/states/parallel_spec.rb | 54 +++++++++++++ 5 files changed, 200 insertions(+), 2 deletions(-) create mode 100644 examples/parallel.asl create mode 100644 lib/floe/workflow/branch.rb create mode 100644 spec/workflow/states/parallel_spec.rb diff --git a/examples/parallel.asl b/examples/parallel.asl new file mode 100644 index 00000000..f04444d9 --- /dev/null +++ b/examples/parallel.asl @@ -0,0 +1,32 @@ +{ + "Comment": "Parallel Example.", + "StartAt": "FunWithMath", + "States": { + "FunWithMath": { + "Type": "Parallel", + "End": true, + "Branches": [ + { + "StartAt": "Add", + "States": { + "Add": { + "Type": "Task", + "Resource": "docker://docker.io/agrare/sleep:latest", + "End": true + } + } + }, + { + "StartAt": "Subtract", + "States": { + "Subtract": { + "Type": "Task", + "Resource": "docker://docker.io/agrare/sleep:latest", + "End": true + } + } + } + ] + } + } +} diff --git a/lib/floe.rb b/lib/floe.rb index 007ddb83..5aa1d789 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -12,6 +12,7 @@ require_relative "floe/workflow" # mixins used by workflow components require_relative "floe/workflow/error_matcher_mixin" +require_relative "floe/workflow/branch" require_relative "floe/workflow/catcher" require_relative "floe/workflow/choice_rule" require_relative "floe/workflow/choice_rule/not" diff --git a/lib/floe/workflow/branch.rb b/lib/floe/workflow/branch.rb new file mode 100644 index 00000000..e9b66d2e --- /dev/null +++ b/lib/floe/workflow/branch.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +module Floe + class Workflow + class Branch < Floe::WorkflowBase + end + end +end diff --git a/lib/floe/workflow/states/parallel.rb b/lib/floe/workflow/states/parallel.rb index 997f4e78..98ff65bb 100644 --- a/lib/floe/workflow/states/parallel.rb +++ b/lib/floe/workflow/states/parallel.rb @@ -4,9 +4,112 @@ module Floe class Workflow module States class Parallel < Floe::Workflow::State - def initialize(*) + include InputOutputMixin + include NonTerminalMixin + include RetryCatchMixin + + attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path, + :result_selector, :retry, :catch, :branches + + def initialize(workflow, name, payload) super - raise NotImplementedError + + missing_field_error!("Branches") if payload["Branches"].nil? + + @next = payload["Next"] + @end = !!payload["End"] + @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] + @input_path = Path.new(payload.fetch("InputPath", "$")) + @output_path = Path.new(payload.fetch("OutputPath", "$")) + @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) + @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] + @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) } + @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) } + @branches = payload["Branches"].map { |branch| Branch.new(branch) } + + validate_state!(workflow) + end + + def start(context) + super + + input = process_input(context) + + context.state["BranchContext"] = branches.map { |_branch| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => input.to_json).to_h } + end + + def finish(context) + if success?(context) + result = each_branch_context(context).map(&:output) + context.output = process_output(context, result) + else + error = parse_error(context) + retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) + end + + super + end + + def run_nonblock!(context) + start(context) unless context.state_started? + + step_nonblock!(context) while running?(context) + return Errno::EAGAIN unless ready?(context) + + finish(context) if ended?(context) + end + + def end? + @end + end + + def ready?(context) + !context.state_started? || each_branch(context).any? { |branch, ctx| branch.step_nonblock_ready?(ctx) } + end + + def wait_until(context) + each_branch(context).any? { |branch, ctx| branch.wait_until(ctx) }.min + end + + def waiting?(context) + each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) } + end + + def running?(context) + !ended?(context) + end + + def ended?(context) + each_branch_context(context).all?(&:ended?) + end + + def success?(context) + each_branch_context(context).none?(&:failed?) + end + + private + + def step_nonblock!(context) + each_branch(context).each do |branch, ctx| + branch.run_nonblock(ctx) if branch.step_nonblock_ready?(ctx) + end + end + + def each_branch(context) + branches.filter_map.with_index do |branch, i| + ctx = context.state.dig("BranchContext", i) + next if ctx.nil? + + [branch, Context.new(ctx)] + end + end + + def each_branch_context(context) + context.state["BranchContext"].map { |ctx| Context.new(ctx) } + end + + def validate_state!(workflow) + validate_state_next!(workflow) end end end diff --git a/spec/workflow/states/parallel_spec.rb b/spec/workflow/states/parallel_spec.rb new file mode 100644 index 00000000..8f55e168 --- /dev/null +++ b/spec/workflow/states/parallel_spec.rb @@ -0,0 +1,54 @@ +RSpec.describe Floe::Workflow::States::Parallel do + let(:input) { {} } + let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) } + let(:state) { workflow.start_workflow.current_state } + let(:workflow) do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "Branches" => [ + { + "StartAt" => "Add", + "States" => { + "Add" => { + "Type" => "Pass", + "End" => true + } + } + }, + { + "StartAt" => "Subtract", + "States" => { + "Subtract" => { + "Type" => "Pass", + "End" => true + } + } + } + ], + "Next" => "NextState" + }, + "NextState" => {"Type" => "Succeed"} + } + + make_workflow(ctx, payload) + end + + describe "#initialize" do + it "builds the Parallel state object" do + expect { workflow }.not_to raise_error + end + + it "raises an InvalidWorkflowError with a missing Branches parameter" do + payload = { + "FunWithMath" => { + "Type" => "Parallel", + "End" => true + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "States.FunWithMath does not have required field \"Branches\"") + end + end +end