Skip to content

Commit

Permalink
Use event driven Runner#wait for Workflow.wait
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Feb 13, 2024
1 parent 45e3def commit 9369eef
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 16 deletions.
70 changes: 58 additions & 12 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,67 @@ def wait(workflows, timeout: nil, &block)
workflows = [workflows] if workflows.kind_of?(self)
logger.info("checking #{workflows.count} workflows...")

start = Time.now.utc
ready = []
run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_thread = Thread.new do
loop do
Runner.for_resource("docker").wait do |event, runner_context|
queue.push([event, runner_context])
end
end
end

loop do
ready = workflows.select(&:step_nonblock_ready?)
break if timeout&.zero?
break if timeout && Time.now.utc - start > timeout

unless ready.empty?
if block
ready.each(&block)
else
break
break if block.nil? && !ready.empty?

ready.each(&block)

# Break if all workflows are completed or we've exceeded the
# requested timeout
break if workflows.all?(&:end?)
break if timeout && (timeout.zero? || Time.now.utc > run_until)

# Find the earliest time that we should wakeup if no container events
# are caught, either a workflow in a Wait or Retry state or we've
# exceeded the requested timeout
wait_until = workflows.map(&:wait_until)
.unshift(run_until)
.compact
.min

# If a workflow is in a waiting state wakeup the main thread when
# it will be done sleeping
if wait_until
sleep_thread = Thread.new do
sleep wait_until - Time.now.utc
queue.push(nil)
end
end

break if workflows.all?(&:end?)
loop do
# Block until an event is raised
event, runner_context = queue.pop
break if event.nil?

# If the event is for one of our workflows set the updated runner_context
workflows.each do |workflow|
next unless workflow.context.state.dig("RunnerContext", "container_ref") == runner_context["container_ref"]

sleep(1)
workflow.context.state["RunnerContext"] = runner_context
end

break if queue.empty?
end
ensure
sleep_thread&.kill
end

logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
ensure
wait_thread&.kill
end
end

Expand Down Expand Up @@ -94,6 +132,14 @@ def step_nonblock_ready?
current_state.ready?
end

def waiting?
current_state.waiting?
end

def wait_until
current_state.wait_until
end

def status
context.status
end
Expand Down
12 changes: 8 additions & 4 deletions lib/floe/workflow/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def finished?
context.state.key?("FinishedTime")
end

def waiting?
context.state["WaitUntil"] && Time.now.utc <= Time.parse(context.state["WaitUntil"])
end

def wait_until
context.state["WaitUntil"] && Time.parse(context.state["WaitUntil"])
end

private

def wait_until!(seconds: nil, time: nil)
Expand All @@ -105,10 +113,6 @@ def wait_until!(seconds: nil, time: nil)
time.iso8601
end
end

def waiting?
context.state["WaitUntil"] && Time.now.utc <= Time.parse(context.state["WaitUntil"])
end
end
end
end

0 comments on commit 9369eef

Please sign in to comment.