Skip to content

Commit

Permalink
Allow async execution for Map state
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Jun 25, 2024
1 parent b3b690c commit ae1edf1
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
43 changes: 36 additions & 7 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,18 @@ def process_input(context)
items_path.value(context, input)
end

def start(context)
super

context.state["Iteration"] = 0
context.state["MaxIterations"] = process_input(context).count
context.state["Result"] = []
context.state["ItemProcessorContext"] = {}
step(context)
end

def finish(context)
input = process_input(context)
result = input.map do |item|
item_processor_context = Context.new((context.state["ItemProcessorContext"] = {}), :input => item.to_json)
item_processor.run(item_processor_context)
JSON.parse(item_processor_context.output)
end
result = context.state["Result"]
context.output = process_output(context, result)
super
end
Expand All @@ -62,12 +67,36 @@ def end?
@end
end

def running?(_)
def ready?(context)
return true unless context.state_started?
return true unless running?(context)

step(context)
false
end

def running?(context)
# TODO: this only works with MaxConcurrency=1
context.state["Iteration"] < context.state["MaxIterations"]
end

private

def step(context)
input = process_input(context)
item = input[context.state["Iteration"]]

item_processor_context = Context.new(context.state["ItemProcessorContext"], :input => item.to_json)
item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context)
if item_processor_context.ended?
result = item_processor.output(item_processor_context)

context.state["Result"] << JSON.parse(result)
context.state["Iteration"] += 1
context.state["ItemProcessorContext"] = {}
end
end

def validate_state!(workflow)
validate_state_next!(workflow)
end
Expand Down
8 changes: 8 additions & 0 deletions lib/floe/workflow_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def current_state(context)
states_by_name[context.state_name]
end

def end?(context)
context.ended?
end

def output(context)
context.output.to_json if end?(context)
end

private

def step!(context)
Expand Down
37 changes: 35 additions & 2 deletions spec/workflow/states/map_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,42 @@

describe "#run_nonblock!" do
it "has no next" do
state.run_nonblock!(ctx)
loop while state.run_nonblock!(ctx) != 0
expect(ctx.next_state).to eq(nil)
expect(JSON.parse(ctx.output).dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40])
end

it "sets the context output" do
loop while state.run_nonblock!(ctx) != 0
expect(ctx.output.dig("detail", "result")).to eq(%w[R31 S39 R31 R40 R40])
end

context "with simple string inputs" do
let(:input) { {"foo" => "bar", "colors" => ["red", "green", "blue"]} }
let(:workflow) do
payload = {
"Validate-All" => {
"Type" => "Map",
"ItemsPath" => "$.colors",
"MaxConcurrency" => 0,
"ItemProcessor" => {
"StartAt" => "Validate",
"States" => {
"Validate" => {
"Type" => "Pass",
"End" => true
}
}
},
"End" => true,
}
}
make_workflow(ctx, payload)
end

it "sets the context output" do
loop while state.run_nonblock!(ctx) != 0
expect(ctx.output).to eq(["red", "green", "blue"])
end
end
end
end

0 comments on commit ae1edf1

Please sign in to comment.