diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index d8495d0c..998debab 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -69,13 +69,25 @@ def run_nonblock!(context) loop while step_nonblock!(context) == 0 && running?(context) return Errno::EAGAIN unless ready?(context) - finish(context) + finish(context) if ended?(context) end def end? @end end + def ready?(context) + !context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) } + end + + def wait_until(context) + context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min + end + + def waiting?(context) + context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) } + end + def running?(context) !ended?(context) end diff --git a/lib/floe/workflow_base.rb b/lib/floe/workflow_base.rb index 55c40677..184298c5 100644 --- a/lib/floe/workflow_base.rb +++ b/lib/floe/workflow_base.rb @@ -46,6 +46,14 @@ def step_nonblock_ready?(context) !context.started? || current_state(context).ready?(context) end + def waiting?(context) + current_state(context)&.waiting?(context) + end + + def wait_until(context) + current_state(context)&.wait_until(context) + end + def start_workflow(context) return if context.state_name