Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Map state #184

Merged
merged 11 commits into from
Oct 4, 2024
41 changes: 41 additions & 0 deletions examples/map.asl
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"Comment": "Using Map state in Inline mode",
"StartAt": "Pass",
"States": {
"Pass": {
"Type": "Pass",
"Next": "Map demo",
"Result": {
"foo": "bar",
"colors": [
"red",
"green",
"blue",
"yellow",
"white"
]
}
},
"Map demo": {
"Type": "Map",
"ItemsPath": "$.colors",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Generate UUID",
"States": {
"Generate UUID": {
"Type": "Pass",
"End": true,
"Parameters": {
"uuid.$": "States.UUID()"
},
"OutputPath": "$.uuid"
}
}
},
"End": true
}
}
}
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require_relative "floe/workflow/choice_rule/and"
require_relative "floe/workflow/choice_rule/data"
require_relative "floe/workflow/context"
require_relative "floe/workflow/item_processor"
require_relative "floe/workflow/intrinsic_function"
require_relative "floe/workflow/intrinsic_function/parser"
require_relative "floe/workflow/intrinsic_function/transformer"
Expand Down
18 changes: 7 additions & 11 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,8 @@ def wait(workflows, timeout: nil, &block)
event, data = queue.pop
break if event.nil?

_execution_id, runner_context = data.values_at("execution_id", "runner_context")

# If the event is for one of our workflows set the updated runner_context
workflows.each do |workflow|
next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"]

workflow.context.state["RunnerContext"] = runner_context
end

break if queue.empty?
# break out of the loop if the event is for one of our workflows
break if queue.empty? || workflows.detect { |wf| wf.execution_id == data["execution_id"] }
end
ensure
sleep_thread&.kill
Expand Down Expand Up @@ -174,14 +166,18 @@ def start_workflow

# NOTE: Expecting the context to be initialized (via start_workflow) before this
def current_state
@states_by_name[context.state_name]
states_by_name[context.state_name]
end

# backwards compatibility. Caller should access directly from context
def credentials
@context.credentials
end

def execution_id
@context.execution["Id"]
end

private

def step!
Expand Down
14 changes: 14 additions & 0 deletions lib/floe/workflow/item_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module Floe
class Workflow
class ItemProcessor < Floe::WorkflowBase
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool 👍... Was wondering how you were going to implement it, since it's like a sub workflow

attr_reader :processor_config

def initialize(payload, name = nil)
super
@processor_config = payload.fetch("ProcessorConfig", "INLINE")
end
end
end
end
108 changes: 106 additions & 2 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,116 @@
# frozen_string_literal: true

require_relative "input_output_mixin"
require_relative "non_terminal_mixin"

module Floe
class Workflow
module States
class Map < Floe::Workflow::State
def initialize(*)
include InputOutputMixin
include NonTerminalMixin

attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path,
:result_selector, :retry, :catch, :item_processor, :items_path,
:item_reader, :item_selector, :item_batcher, :result_writer,
:max_concurrency, :tolerated_failure_percentage, :tolerated_failure_count

def initialize(workflow, name, payload)
super
raise NotImplementedError

missing_field_error!("InputProcessor") if payload["ItemProcessor"].nil?

@next = payload["Next"]
@end = !!payload["End"]
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
@catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
@item_processor = ItemProcessor.new(payload["ItemProcessor"], name)
@items_path = ReferencePath.new(payload.fetch("ItemsPath", "$"))
@item_reader = payload["ItemReader"]
@item_selector = payload["ItemSelector"]
@item_batcher = payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]
@tolerated_failure_count = payload["ToleratedFailureCount"]

validate_state!(workflow)
end

def process_input(context)
input = super
items_path.value(context, input)
end

def start(context)
super

input = process_input(context)

context.state["Iteration"] = 0
context.state["MaxIterations"] = input.count
context.state["ItemProcessorContext"] = input.map { |item| Context.new({"Execution" => {"Id" => context.execution["Id"]}}, :input => item.to_json).to_h }
end

def finish(context)
result = context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx).output }
context.output = process_output(context, result)
super
end

def run_nonblock!(context)
start(context) unless context.state_started?

loop while step_nonblock!(context) == 0 && running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context) if ended?(context)
end

def end?
@end
end

def ready?(context)
!context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will context.state_started? properly detect that all the child states have started?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No only that the current Map state has started

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had deleted context.state_started? from Workflow and I think that is working. It had not before. Not sure if I don't have tests that push this anymore or if we moved stuff around.

So you can probably drop it?

end

def wait_until(context)
context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min
end

def waiting?(context)
context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
context.state["ItemProcessorContext"].all? { |ctx| Context.new(ctx).ended? }
end

private

def step_nonblock!(context)
item_processor_context = Context.new(context.state["ItemProcessorContext"][context.state["Iteration"]])
item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context)
if item_processor_context.ended?
context.state["Iteration"] += 1
0
else
Errno::EAGAIN
end
Comment on lines +102 to +109
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is the loop thing again

context.state["ItemProcessorContext"].each do |ctx_hash|
  ctx = Context.new(ctx_hash) 
  item_processor.run_nonblock(ctx) if item_processor.step_nonblock_ready?(ctx) }

end

def validate_state!(workflow)
validate_state_next!(workflow)
end
end
end
Expand Down
77 changes: 77 additions & 0 deletions lib/floe/workflow_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,85 @@ def initialize(payload, name = nil)
validate_workflow!
end

def run(context)
run_nonblock(context) until context.ended?
end

def run_nonblock(context)
start_workflow(context)
loop while step_nonblock(context) == 0 && !context.ended?
self
end

def step_nonblock(context)
return Errno::EPERM if context.ended?

result = current_state(context).run_nonblock!(context)
return result if result != 0

context.state_history << context.state
context.next_state ? step!(context) : end_workflow!(context)

result
end

def step_nonblock_ready?(context)
!context.started? || current_state(context).ready?(context)
end

def waiting?(context)
current_state(context)&.waiting?(context)
end

def wait_until(context)
current_state(context)&.wait_until(context)
end

def start_workflow(context)
return if context.state_name

context.state["Name"] = start_at
context.state["Input"] = context.execution["Input"].dup

context.execution["StartTime"] = Time.now.utc.iso8601

self
end

def current_state(context)
states_by_name[context.state_name]
end

def end?(context)
context.ended?
end

def output(context)
context.output.to_json if end?(context)
end

private

def step!(context)
next_state = {"Name" => context.next_state}

# if rerunning due to an error (and we are using Retry)
if context.state_name == context.next_state && context.failed? && context.state.key?("Retrier")
next_state.merge!(context.state.slice("RetryCount", "Input", "Retrier"))
else
next_state["Input"] = context.output
end

context.state = next_state
end

# Avoiding State#running? because that is potentially expensive.
# State#run_nonblock! already called running? via State#ready? and
# called State#finished -- which is what Context#state_finished? is detecting
def end_workflow!(context)
context.execution["EndTime"] = context.state["FinishedTime"]
end

def validate_workflow!
missing_field_error!("States") if @states.empty?
missing_field_error!("StartAt") if @start_at.nil?
Expand Down
25 changes: 25 additions & 0 deletions spec/workflow/item_processor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
RSpec.describe Floe::Workflow::ItemProcessor do
it "raises an exception for missing States field" do
payload = {"StartAt" => "Missing"}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"States\"")
end

it "raises an exception for missing StartAt field" do
payload = {"States" => {"First" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map does not have required field \"StartAt\"")
end

it "raises an exception if StartAt isn't in States" do
payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "Map field \"StartAt\" value \"First\" is not found in \"States\"")
end

it "raises an exception if a Next state isn't in States" do
payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}}
expect { described_class.new(payload, ["Map"]) }
.to raise_error(Floe::InvalidWorkflowError, "States.First field \"Next\" value \"Last\" is not found in \"States\"")
end
end
Loading