From ae1edf1c50bf5d3ce4d52aee452662b141e4a2b2 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 18 Jun 2024 16:56:29 -0400 Subject: [PATCH] Allow async execution for Map state --- lib/floe/workflow/states/map.rb | 43 ++++++++++++++++++++++++++------ lib/floe/workflow_base.rb | 8 ++++++ spec/workflow/states/map_spec.rb | 37 +++++++++++++++++++++++++-- 3 files changed, 79 insertions(+), 9 deletions(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index fd811991..5a7b0f81 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -47,13 +47,18 @@ def process_input(context) items_path.value(context, input) end + def start(context) + super + + context.state["Iteration"] = 0 + context.state["MaxIterations"] = process_input(context).count + context.state["Result"] = [] + context.state["ItemProcessorContext"] = {} + step(context) + end + def finish(context) - input = process_input(context) - result = input.map do |item| - item_processor_context = Context.new((context.state["ItemProcessorContext"] = {}), :input => item.to_json) - item_processor.run(item_processor_context) - JSON.parse(item_processor_context.output) - end + result = context.state["Result"] context.output = process_output(context, result) super end @@ -62,12 +67,36 @@ def end? @end end - def running?(_) + def ready?(context) + return true unless context.state_started? + return true unless running?(context) + + step(context) false end + def running?(context) + # TODO: this only works with MaxConcurrency=1 + context.state["Iteration"] < context.state["MaxIterations"] + end + private + def step(context) + input = process_input(context) + item = input[context.state["Iteration"]] + + item_processor_context = Context.new(context.state["ItemProcessorContext"], :input => item.to_json) + item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context) + if item_processor_context.ended? + result = item_processor.output(item_processor_context) + + context.state["Result"] << JSON.parse(result) + context.state["Iteration"] += 1 + context.state["ItemProcessorContext"] = {} + end + end + def validate_state!(workflow) validate_state_next!(workflow) end diff --git a/lib/floe/workflow_base.rb b/lib/floe/workflow_base.rb index f956cfbf..94bcd526 100644 --- a/lib/floe/workflow_base.rb +++ b/lib/floe/workflow_base.rb @@ -57,6 +57,14 @@ def current_state(context) states_by_name[context.state_name] end + def end?(context) + context.ended? + end + + def output(context) + context.output.to_json if end?(context) + end + private def step!(context) diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 634eb7c7..19727143 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -107,9 +107,42 @@ describe "#run_nonblock!" do it "has no next" do - state.run_nonblock!(ctx) + loop while state.run_nonblock!(ctx) != 0 expect(ctx.next_state).to eq(nil) - expect(JSON.parse(ctx.output).dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40]) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output.dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40]) + end + + context "with simple string inputs" do + let(:input) { {"foo" => "bar", "colors" => ["red", "green", "blue"]} } + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemsPath" => "$.colors", + "MaxConcurrency" => 0, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "End" => true + } + } + }, + "End" => true, + } + } + make_workflow(ctx, payload) + end + + it "sets the context output" do + loop while state.run_nonblock!(ctx) != 0 + expect(ctx.output).to eq(["red", "green", "blue"]) + end end end end