Skip to content

Commit

Permalink
Run ItemProcessor workflow for Map state
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Jun 13, 2024
1 parent 567cc16 commit da41408
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 10 deletions.
3 changes: 2 additions & 1 deletion lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,14 @@ def start_workflow

# NOTE: Expecting the context to be initialized (via start_workflow) before this
def current_state
@states_by_name[context.state_name]
states_by_name[context.state_name]
end

# backwards compatibility. Caller should access directly from context
def credentials
@context.credentials
end

private

def step!
Expand Down
5 changes: 0 additions & 5 deletions lib/floe/workflow/item_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ def initialize(payload, name = nil)
super
@processor_config = payload.fetch("ProcessorConfig", "INLINE")
end

def value(_context, input = {})
# TODO: Run the states to get the output
input
end
end
end
end
6 changes: 5 additions & 1 deletion lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ def process_input(context)

def finish(context)
input = process_input(context)
result = item_processor.value(context, input)
result = input.map do |item|
item_processor_context = Context.new((context.state["ItemProcessorContext"] = {}), :input => item)
item_processor.run(item_processor_context)
item_processor_context.output
end
context.output = process_output(context, result)
super
end
Expand Down
63 changes: 63 additions & 0 deletions lib/floe/workflow_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,68 @@ def initialize(payload, name = nil)
@states = payload["States"].to_a.map { |state_name, state| Floe::Workflow::State.build!(self, state_name, state) }
@states_by_name = @states.to_h { |state| [state.name, state] }
end

def run(context)
run_nonblock(context) until context.ended?
end

def run_nonblock(context)
start_workflow(context)
loop while step_nonblock(context) == 0 && !context.ended?
self
end

def step_nonblock(context)
return Errno::EPERM if context.ended?

result = current_state(context).run_nonblock!(context)
return result if result != 0

context.state_history << context.state
context.next_state ? step!(context) : end_workflow!(context)

result
end

def step_nonblock_ready?(context)
!context.started? || current_state(context).ready?(context)
end

def start_workflow(context)
return if context.state_name

context.state["Name"] = start_at
context.state["Input"] = context.execution["Input"].dup

context.execution["StartTime"] = Time.now.utc.iso8601

self
end

def current_state(context)
states_by_name[context.state_name]
end

private

def step!(context)
next_state = {"Name" => context.next_state}

# if rerunning due to an error (and we are using Retry)
if context.state_name == context.next_state && context.failed? && context.state.key?("Retrier")
next_state.merge!(context.state.slice("RetryCount", "Input", "Retrier"))
else
next_state["Input"] = context.output
end

context.state = next_state
end

# Avoiding State#running? because that is potentially expensive.
# State#run_nonblock! already called running? via State#ready? and
# called State#finished -- which is what Context#state_finished? is detecting
def end_workflow!(context)
context.execution["EndTime"] = context.state["FinishedTime"]
end
end
end
6 changes: 3 additions & 3 deletions spec/workflow/states/map_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
"StartAt" => "Validate",
"States" => {
"Validate" => {
"Type" => "Pass",
"OutputPath" => "$.Payload",
"End" => true
"Type" => "Pass",
"End" => true
}
}
},
Expand Down Expand Up @@ -109,6 +108,7 @@
it "has no next" do
state.run_nonblock!(ctx)
expect(ctx.next_state).to eq(nil)
expect(ctx.output["detail"]["result"]).to eq(input["detail"]["shipped"])
end
end
end

0 comments on commit da41408

Please sign in to comment.