From f236c44676fc9b2eb5baf987f1ffeefb18449369 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 10 Jul 2024 16:19:07 -0400 Subject: [PATCH] Delegate wait_until/waiting to child workflows --- lib/floe/workflow/states/map.rb | 14 +++++++++++++- lib/floe/workflow_base.rb | 8 ++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index d8495d0c..998debab 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -69,13 +69,25 @@ def run_nonblock!(context) loop while step_nonblock!(context) == 0 && running?(context) return Errno::EAGAIN unless ready?(context) - finish(context) + finish(context) if ended?(context) end def end? @end end + def ready?(context) + !context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) } + end + + def wait_until(context) + context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min + end + + def waiting?(context) + context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) } + end + def running?(context) !ended?(context) end diff --git a/lib/floe/workflow_base.rb b/lib/floe/workflow_base.rb index 55c40677..184298c5 100644 --- a/lib/floe/workflow_base.rb +++ b/lib/floe/workflow_base.rb @@ -46,6 +46,14 @@ def step_nonblock_ready?(context) !context.started? || current_state(context).ready?(context) end + def waiting?(context) + current_state(context)&.waiting?(context) + end + + def wait_until(context) + current_state(context)&.wait_until(context) + end + def start_workflow(context) return if context.state_name