Skip to content

Commit

Permalink
Delegate wait_until/waiting to child workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Jul 10, 2024
1 parent 55b6fe4 commit f236c44
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
14 changes: 13 additions & 1 deletion lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,25 @@ def run_nonblock!(context)
loop while step_nonblock!(context) == 0 && running?(context)
return Errno::EAGAIN unless ready?(context)

finish(context)
finish(context) if ended?(context)
end

def end?
@end
end

def ready?(context)
!context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) }
end

def wait_until(context)
context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min
end

def waiting?(context)
context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) }
end

def running?(context)
!ended?(context)
end
Expand Down
8 changes: 8 additions & 0 deletions lib/floe/workflow_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def step_nonblock_ready?(context)
!context.started? || current_state(context).ready?(context)
end

def waiting?(context)
current_state(context)&.waiting?(context)
end

def wait_until(context)
current_state(context)&.wait_until(context)
end

def start_workflow(context)
return if context.state_name

Expand Down

0 comments on commit f236c44

Please sign in to comment.