From 6318b74ca4551b98d331f4fe96df8543bc3acce1 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Mon, 20 May 2024 11:29:56 -0400 Subject: [PATCH 01/11] Implement Map state --- lib/floe.rb | 1 + lib/floe/workflow/item_processor.rb | 19 +++++ lib/floe/workflow/states/map.rb | 63 ++++++++++++++- spec/workflow/item_processor_spec.rb | 25 ++++++ spec/workflow/states/map_spec.rb | 114 +++++++++++++++++++++++++++ 5 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 lib/floe/workflow/item_processor.rb create mode 100644 spec/workflow/item_processor_spec.rb create mode 100644 spec/workflow/states/map_spec.rb diff --git a/lib/floe.rb b/lib/floe.rb index 1bce78ea..c4d11f34 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -18,6 +18,7 @@ require_relative "floe/workflow/choice_rule/and" require_relative "floe/workflow/choice_rule/data" require_relative "floe/workflow/context" +require_relative "floe/workflow/item_processor" require_relative "floe/workflow/intrinsic_function" require_relative "floe/workflow/intrinsic_function/parser" require_relative "floe/workflow/intrinsic_function/transformer" diff --git a/lib/floe/workflow/item_processor.rb b/lib/floe/workflow/item_processor.rb new file mode 100644 index 00000000..5daba5da --- /dev/null +++ b/lib/floe/workflow/item_processor.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Floe + class Workflow + class ItemProcessor < Floe::WorkflowBase + attr_reader :processor_config + + def initialize(payload, name = nil) + super + @processor_config = payload.fetch("ProcessorConfig", "INLINE") + end + + def value(_context, input = {}) + # TODO: Run the states to get the output + input + end + end + end +end diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 15c1bdfa..90069b8d 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -1,12 +1,71 @@ # frozen_string_literal: true +require_relative "input_output_mixin" +require_relative "non_terminal_mixin" + module Floe class Workflow module States class Map < Floe::Workflow::State - def initialize(*) + include InputOutputMixin + include NonTerminalMixin + + attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path, + :result_selector, :retry, :catch, :item_processor, :items_path, + :item_reader, :item_selector, :item_batcher, :result_writer, + :max_concurrency, :tolerated_failure_percentage, :tolerated_failure_count + + def initialize(workflow, name, payload) + super + + raise Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [#{name.last}]" if payload["ItemProcessor"].nil? + + @next = payload["Next"] + @end = !!payload["End"] + @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] + @input_path = Path.new(payload.fetch("InputPath", "$")) + @output_path = Path.new(payload.fetch("OutputPath", "$")) + @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) + @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] + @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) } + @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) } + @item_processor = ItemProcessor.new(payload["ItemProcessor"], name) + @items_path = ReferencePath.new(payload.fetch("ItemsPath", "$")) + @item_reader = payload["ItemReader"] + @item_selector = payload["ItemSelector"] + @item_batcher = payload["ItemBatcher"] + @result_writer = payload["ResultWriter"] + @max_concurrency = payload["MaxConcurrency"]&.to_i + @tolerated_failure_percentage = payload["ToleratedFailurePercentage"] + @tolerated_failure_count = payload["ToleratedFailureCount"] + + validate_state!(workflow) + end + + def process_input(context) + input = super + items_path.value(context, input) + end + + def finish(context) + input = process_input(context) + result = item_processor.value(context, input) + context.output = process_output(context, result) super - raise NotImplementedError + end + + def end? + @end + end + + def running?(_) + false + end + + private + + def validate_state!(workflow) + validate_state_next!(workflow) end end end diff --git a/spec/workflow/item_processor_spec.rb b/spec/workflow/item_processor_spec.rb new file mode 100644 index 00000000..0dd52dbc --- /dev/null +++ b/spec/workflow/item_processor_spec.rb @@ -0,0 +1,25 @@ +RSpec.describe Floe::Workflow::ItemProcessor do + it "raises an exception for missing States field" do + payload = {"StartAt" => "Missing"} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"States\"") + end + + it "raises an exception for missing StartAt field" do + payload = {"States" => {"First" => {"Type" => "Succeed"}}} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"StartAt\"") + end + + it "raises an exception if StartAt isn't in States" do + payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "Map field \"StartAt\" value \"First\" is not found in \"States\"") + end + + it "raises an exception if a Next state isn't in States" do + payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}} + expect { described_class.new(payload, ["Map"]) } + .to raise_error(Floe::InvalidWorkflowError, "States.First field \"Next\" value \"Last\" is not found in \"States\"") + end +end diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb new file mode 100644 index 00000000..be0dc7df --- /dev/null +++ b/spec/workflow/states/map_spec.rb @@ -0,0 +1,114 @@ +RSpec.describe Floe::Workflow::States::Map do + let(:input) { {} } + let(:ctx) { Floe::Workflow::Context.new(:input => input.to_json) } + let(:state) { workflow.start_workflow.current_state } + let(:input) do + { + "ship-date" => "2016-03-14T01:59:00Z", + "detail" => { + "delivery-partner" => "UQS", + "shipped" => [ + {"prod" => "R31", "dest-code" => 9511, "quantity" => 1344}, + {"prod" => "S39", "dest-code" => 9511, "quantity" => 40}, + {"prod" => "R31", "dest-code" => 9833, "quantity" => 12}, + {"prod" => "R40", "dest-code" => 9860, "quantity" => 887}, + {"prod" => "R40", "dest-code" => 9511, "quantity" => 1220} + ] + } + } + end + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "InputPath" => "$.detail", + "ItemsPath" => "$.shipped", + "MaxConcurrency" => 0, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "OutputPath" => "$.Payload", + "End" => true + } + } + }, + "ResultPath" => "$.detail.result", + "End" => true, + } + } + make_workflow(ctx, payload) + end + + describe "#initialize" do + it "raises an InvalidWorkflowError with a missing ItemProcessor" do + payload = { + "Validate-All" => { + "Type" => "Map", + "End" => true + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [Validate-All]") + end + + it "raises an InvalidWorkflowError with a missing Next and End" do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => {"Validate" => {"Type" => "Succeed"}} + } + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "States.Validate-All does not have required field \"Next\"") + end + + it "raises an InvalidWorkflowError if a state in ItemProcessor attempts to transition to a state in the outer workflow" do + payload = { + "StartAt" => "MapState", + "States" => { + "MapState" => { + "Type" => "Map", + "Next" => "PassState", + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "Next" => "PassState" + } + } + } + }, + "PassState" => { + "Type" => "Pass", + "Next" => "SucceedState" + }, + "SucceedState" => { + "Type" => "Succeed" + } + } + } + + expect { Floe::Workflow.new(payload, ctx) } + .to raise_error(Floe::InvalidWorkflowError, "States.Validate field \"Next\" value \"PassState\" is not found in \"States\"") + end + end + + it "#end?" do + expect(state.end?).to be true + end + + describe "#run_nonblock!" do + it "has no next" do + state.run_nonblock!(ctx) + expect(ctx.next_state).to eq(nil) + end + end +end From 44deddddf6db96f238f4a276a488d51a2affe1ee Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Thu, 13 Jun 2024 16:15:09 -0400 Subject: [PATCH 02/11] Run ItemProcessor workflow for Map state --- lib/floe/workflow.rb | 2 +- lib/floe/workflow/item_processor.rb | 5 --- lib/floe/workflow/states/map.rb | 6 ++- lib/floe/workflow_base.rb | 61 +++++++++++++++++++++++++++++ spec/workflow/states/map_spec.rb | 7 ++-- 5 files changed, 71 insertions(+), 10 deletions(-) diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 70c78b82..59929bb9 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -174,7 +174,7 @@ def start_workflow # NOTE: Expecting the context to be initialized (via start_workflow) before this def current_state - @states_by_name[context.state_name] + states_by_name[context.state_name] end # backwards compatibility. Caller should access directly from context diff --git a/lib/floe/workflow/item_processor.rb b/lib/floe/workflow/item_processor.rb index 5daba5da..e20cee7d 100644 --- a/lib/floe/workflow/item_processor.rb +++ b/lib/floe/workflow/item_processor.rb @@ -9,11 +9,6 @@ def initialize(payload, name = nil) super @processor_config = payload.fetch("ProcessorConfig", "INLINE") end - - def value(_context, input = {}) - # TODO: Run the states to get the output - input - end end end end diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 90069b8d..f7d05120 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -49,7 +49,11 @@ def process_input(context) def finish(context) input = process_input(context) - result = item_processor.value(context, input) + result = input.map do |item| + item_processor_context = Context.new((context.state["ItemProcessorContext"] = {}), :input => item.to_json) + item_processor.run(item_processor_context) + JSON.parse(item_processor_context.output) + end context.output = process_output(context, result) super end diff --git a/lib/floe/workflow_base.rb b/lib/floe/workflow_base.rb index 7d3371c9..73ce8ed2 100644 --- a/lib/floe/workflow_base.rb +++ b/lib/floe/workflow_base.rb @@ -20,8 +20,69 @@ def initialize(payload, name = nil) validate_workflow! end + def run(context) + run_nonblock(context) until context.ended? + end + + def run_nonblock(context) + start_workflow(context) + loop while step_nonblock(context) == 0 && !context.ended? + self + end + + def step_nonblock(context) + return Errno::EPERM if context.ended? + + result = current_state(context).run_nonblock!(context) + return result if result != 0 + + context.state_history << context.state + context.next_state ? step!(context) : end_workflow!(context) + + result + end + + def step_nonblock_ready?(context) + !context.started? || current_state(context).ready?(context) + end + + def start_workflow(context) + return if context.state_name + + context.state["Name"] = start_at + context.state["Input"] = context.execution["Input"].dup + + context.execution["StartTime"] = Time.now.utc.iso8601 + + self + end + + def current_state(context) + states_by_name[context.state_name] + end + private + def step!(context) + next_state = {"Name" => context.next_state} + + # if rerunning due to an error (and we are using Retry) + if context.state_name == context.next_state && context.failed? && context.state.key?("Retrier") + next_state.merge!(context.state.slice("RetryCount", "Input", "Retrier")) + else + next_state["Input"] = context.output + end + + context.state = next_state + end + + # Avoiding State#running? because that is potentially expensive. + # State#run_nonblock! already called running? via State#ready? and + # called State#finished -- which is what Context#state_finished? is detecting + def end_workflow!(context) + context.execution["EndTime"] = context.state["FinishedTime"] + end + def validate_workflow! missing_field_error!("States") if @states.empty? missing_field_error!("StartAt") if @start_at.nil? diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index be0dc7df..8eda168b 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -28,9 +28,9 @@ "StartAt" => "Validate", "States" => { "Validate" => { - "Type" => "Pass", - "OutputPath" => "$.Payload", - "End" => true + "Type" => "Pass", + "End" => true, + "InputPath" => "$.prod" } } }, @@ -109,6 +109,7 @@ it "has no next" do state.run_nonblock!(ctx) expect(ctx.next_state).to eq(nil) + expect(JSON.parse(ctx.output).dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40]) end end end From 0207fc1e0fa821873bfe024a983f63663fa9b394 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 18 Jun 2024 16:56:29 -0400 Subject: [PATCH 03/11] Allow async execution for Map state --- lib/floe/workflow/states/map.rb | 43 ++++++++++++++++++++++++++------ lib/floe/workflow_base.rb | 8 ++++++ spec/workflow/states/map_spec.rb | 37 +++++++++++++++++++++++++-- 3 files changed, 79 insertions(+), 9 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index f7d05120..729282aa 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -47,13 +47,18 @@ def process_input(context) items_path.value(context, input) end + def start(context) + super + + context.state["Iteration"] = 0 + context.state["MaxIterations"] = process_input(context).count + context.state["Result"] = [] + context.state["ItemProcessorContext"] = {} + step(context) + end + def finish(context) - input = process_input(context) - result = input.map do |item| - item_processor_context = Context.new((context.state["ItemProcessorContext"] = {}), :input => item.to_json) - item_processor.run(item_processor_context) - JSON.parse(item_processor_context.output) - end + result = context.state["Result"] context.output = process_output(context, result) super end @@ -62,12 +67,36 @@ def end? @end end - def running?(_) + def ready?(context) + return true unless context.state_started? + return true unless running?(context) + + step(context) false end + def running?(context) + # TODO: this only works with MaxConcurrency=1 + context.state["Iteration"] < context.state["MaxIterations"] + end + private + def step(context) + input = process_input(context) + item = input[context.state["Iteration"]] + + item_processor_context = Context.new(context.state["ItemProcessorContext"], :input => item.to_json) + item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context) + if item_processor_context.ended? + result = item_processor.output(item_processor_context) + + context.state["Result"] << JSON.parse(result) + context.state["Iteration"] += 1 + context.state["ItemProcessorContext"] = {} + end + end + def validate_state!(workflow) validate_state_next!(workflow) end diff --git a/lib/floe/workflow_base.rb b/lib/floe/workflow_base.rb index 73ce8ed2..55c40677 100644 --- a/lib/floe/workflow_base.rb +++ b/lib/floe/workflow_base.rb @@ -61,6 +61,14 @@ def current_state(context) states_by_name[context.state_name] end + def end?(context) + context.ended? + end + + def output(context) + context.output.to_json if end?(context) + end + private def step!(context) diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 8eda168b..129b0443 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -107,9 +107,42 @@ describe "#run_nonblock!" do it "has no next" do - state.run_nonblock!(ctx) + loop while state.run_nonblock!(ctx) != 0 expect(ctx.next_state).to eq(nil) - expect(JSON.parse(ctx.output).dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40]) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output.dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40]) + end + + context "with simple string inputs" do + let(:input) { {"foo" => "bar", "colors" => ["red", "green", "blue"]} } + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemsPath" => "$.colors", + "MaxConcurrency" => 0, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "End" => true + } + } + }, + "End" => true, + } + } + make_workflow(ctx, payload) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output).to eq(["red", "green", "blue"]) + end end end end From 79c2759dbfeebcf8bd92aa5ea909b3a53b1b7272 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 2 Jul 2024 13:24:20 -0400 Subject: [PATCH 04/11] Add an example map.asl --- examples/map.asl | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 examples/map.asl diff --git a/examples/map.asl b/examples/map.asl new file mode 100644 index 00000000..efa77ad6 --- /dev/null +++ b/examples/map.asl @@ -0,0 +1,41 @@ +{ + "Comment": "Using Map state in Inline mode", + "StartAt": "Pass", + "States": { + "Pass": { + "Type": "Pass", + "Next": "Map demo", + "Result": { + "foo": "bar", + "colors": [ + "red", + "green", + "blue", + "yellow", + "white" + ] + } + }, + "Map demo": { + "Type": "Map", + "ItemsPath": "$.colors", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Generate UUID", + "States": { + "Generate UUID": { + "Type": "Pass", + "End": true, + "Parameters": { + "uuid.$": "States.UUID()" + }, + "OutputPath": "$.uuid" + } + } + }, + "End": true + } + } +} From 80df0f08bb9fe52b36c86fc1d52e9938db1504e1 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Mon, 8 Jul 2024 14:32:54 -0400 Subject: [PATCH 05/11] Continue to Map#step until it would block --- lib/floe/workflow/states/map.rb | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 729282aa..a50f6baf 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -54,7 +54,6 @@ def start(context) context.state["MaxIterations"] = process_input(context).count context.state["Result"] = [] context.state["ItemProcessorContext"] = {} - step(context) end def finish(context) @@ -63,16 +62,16 @@ def finish(context) super end - def end? - @end - end + def run_nonblock!(context) + start(context) unless context.state_started? + loop while step_nonblock!(context) == 0 && running?(context) + return Errno::EAGAIN unless ready?(context) - def ready?(context) - return true unless context.state_started? - return true unless running?(context) + finish(context) + end - step(context) - false + def end? + @end end def running?(context) @@ -82,7 +81,7 @@ def running?(context) private - def step(context) + def step_nonblock!(context) input = process_input(context) item = input[context.state["Iteration"]] @@ -94,6 +93,9 @@ def step(context) context.state["Result"] << JSON.parse(result) context.state["Iteration"] += 1 context.state["ItemProcessorContext"] = {} + 0 + else + Errno::EAGAIN end end From 80d94685f85308ed27cb463c3e6f014d61032b34 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Mon, 8 Jul 2024 14:39:28 -0400 Subject: [PATCH 06/11] Pre-define all item processor contexts --- lib/floe/workflow/states/map.rb | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index a50f6baf..55f69ee3 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -50,10 +50,12 @@ def process_input(context) def start(context) super + input = process_input(context) + context.state["Iteration"] = 0 - context.state["MaxIterations"] = process_input(context).count + context.state["MaxIterations"] = input.count context.state["Result"] = [] - context.state["ItemProcessorContext"] = {} + context.state["ItemProcessorContext"] = input.map { |item| Context.new(nil, :input => item.to_json).to_h } end def finish(context) @@ -82,17 +84,13 @@ def running?(context) private def step_nonblock!(context) - input = process_input(context) - item = input[context.state["Iteration"]] - - item_processor_context = Context.new(context.state["ItemProcessorContext"], :input => item.to_json) + item_processor_context = Context.new(context.state["ItemProcessorContext"][context.state["Iteration"]]) item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context) if item_processor_context.ended? result = item_processor.output(item_processor_context) context.state["Result"] << JSON.parse(result) context.state["Iteration"] += 1 - context.state["ItemProcessorContext"] = {} 0 else Errno::EAGAIN From 9d785f09e27d0726cc5f2b743becbe779006863c Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 9 Jul 2024 13:13:15 -0400 Subject: [PATCH 07/11] Use ItemProcessorContext for running and result --- lib/floe/workflow/states/map.rb | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 55f69ee3..d8495d0c 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -52,20 +52,20 @@ def start(context) input = process_input(context) - context.state["Iteration"] = 0 - context.state["MaxIterations"] = input.count - context.state["Result"] = [] + context.state["Iteration"] = 0 + context.state["MaxIterations"] = input.count context.state["ItemProcessorContext"] = input.map { |item| Context.new(nil, :input => item.to_json).to_h } end def finish(context) - result = context.state["Result"] + result = context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx).output } context.output = process_output(context, result) super end def run_nonblock!(context) start(context) unless context.state_started? + loop while step_nonblock!(context) == 0 && running?(context) return Errno::EAGAIN unless ready?(context) @@ -77,8 +77,11 @@ def end? end def running?(context) - # TODO: this only works with MaxConcurrency=1 - context.state["Iteration"] < context.state["MaxIterations"] + !ended?(context) + end + + def ended?(context) + context.state["ItemProcessorContext"].all? { |ctx| Context.new(ctx).ended? } end private @@ -87,9 +90,6 @@ def step_nonblock!(context) item_processor_context = Context.new(context.state["ItemProcessorContext"][context.state["Iteration"]]) item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context) if item_processor_context.ended? - result = item_processor.output(item_processor_context) - - context.state["Result"] << JSON.parse(result) context.state["Iteration"] += 1 0 else From ed854ce102b31712c2ef221ca1f98915ac98d114 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 10 Jul 2024 16:19:07 -0400 Subject: [PATCH 08/11] Delegate wait_until/waiting to child workflows --- lib/floe/workflow/states/map.rb | 14 +++++++++++++- lib/floe/workflow_base.rb | 8 ++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index d8495d0c..998debab 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -69,13 +69,25 @@ def run_nonblock!(context) loop while step_nonblock!(context) == 0 && running?(context) return Errno::EAGAIN unless ready?(context) - finish(context) + finish(context) if ended?(context) end def end? @end end + def ready?(context) + !context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) } + end + + def wait_until(context) + context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min + end + + def waiting?(context) + context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) } + end + def running?(context) !ended?(context) end diff --git a/lib/floe/workflow_base.rb b/lib/floe/workflow_base.rb index 55c40677..184298c5 100644 --- a/lib/floe/workflow_base.rb +++ b/lib/floe/workflow_base.rb @@ -46,6 +46,14 @@ def step_nonblock_ready?(context) !context.started? || current_state(context).ready?(context) end + def waiting?(context) + current_state(context)&.waiting?(context) + end + + def wait_until(context) + current_state(context)&.wait_until(context) + end + def start_workflow(context) return if context.state_name From 8bf874c3b8248bfc26c40d3650308aeaaf089c0d Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 4 Oct 2024 10:23:50 -0400 Subject: [PATCH 09/11] Pass execution_id down to ItemProcessor states --- lib/floe/workflow/states/map.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 998debab..7f01a553 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -54,7 +54,7 @@ def start(context) context.state["Iteration"] = 0 context.state["MaxIterations"] = input.count - context.state["ItemProcessorContext"] = input.map { |item| Context.new(nil, :input => item.to_json).to_h } + context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h } end def finish(context) From 9e3d2d6d4d38ec42c99bcdc8f05dcae7ba0ae915 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 4 Oct 2024 10:32:20 -0400 Subject: [PATCH 10/11] Check event against execution_id --- lib/floe/workflow.rb | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 59929bb9..452f7513 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -65,16 +65,8 @@ def wait(workflows, timeout: nil, &block) event, data = queue.pop break if event.nil? - _execution_id, runner_context = data.values_at("execution_id", "runner_context") - - # If the event is for one of our workflows set the updated runner_context - workflows.each do |workflow| - next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"] - - workflow.context.state["RunnerContext"] = runner_context - end - - break if queue.empty? + # break out of the loop if the event is for one of our workflows + break if queue.empty? || workflows.detect { |wf| wf.execution_id == data["execution_id"] } end ensure sleep_thread&.kill @@ -182,6 +174,10 @@ def credentials @context.credentials end + def execution_id + @context.execution["Id"] + end + private def step! From 1ca1251a310ee5cfda9fc76fe3eb7a41aed18bee Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 4 Oct 2024 10:44:13 -0400 Subject: [PATCH 11/11] Use missing_field_error --- lib/floe/workflow/states/map.rb | 2 +- spec/workflow/states/map_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 7f01a553..41e6a021 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -18,7 +18,7 @@ class Map < Floe::Workflow::State def initialize(workflow, name, payload) super - raise Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [#{name.last}]" if payload["ItemProcessor"].nil? + missing_field_error!("InputProcessor") if payload["ItemProcessor"].nil? @next = payload["Next"] @end = !!payload["End"] diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 129b0443..8b6b1812 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -51,7 +51,7 @@ } expect { make_workflow(ctx, payload) } - .to raise_error(Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [Validate-All]") + .to raise_error(Floe::InvalidWorkflowError, "States.Validate-All does not have required field \"InputProcessor\"") end it "raises an InvalidWorkflowError with a missing Next and End" do