Skip to content

Commit

Permalink
Add Floe::Workflow::Runner#watch
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Oct 30, 2023
1 parent e9a9a36 commit 02f23f5
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 7 deletions.
1 change: 1 addition & 0 deletions floe.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency "awesome_spawn", "~>1.0"
spec.add_dependency "concurrent-ruby", "~>1.2"
spec.add_dependency "jsonpath", "~>1.1"
spec.add_dependency "kubeclient", "~>4.7"
spec.add_dependency "more_core_extensions"
Expand Down
27 changes: 20 additions & 7 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "concurrent"
require "securerandom"
require "json"

Expand All @@ -13,19 +14,27 @@ def load(path_or_io, context = nil, credentials = {})
new(payload, context, credentials)
end

def wait(workflows, timeout: 5)
def wait(workflows, timeout: 60)
workflows = [workflows] if workflows.kind_of?(self)
logger.info("checking #{workflows.count} workflows...")

start = Time.now.utc
ready = []
wakeup = Concurrent::Event.new
start = Time.now.utc

loop do
ready = workflows.select(&:step_nonblock_ready?)
break if timeout.zero? || Time.now.utc - start > timeout || !ready.empty?
ready = workflows.select(&:step_nonblock_ready?)
return ready if timeout.zero? || !ready.empty?

sleep(1)
wait_until = workflows.map(&:wait_until).compact.unshift(start + timeout).min
runner_contexts = workflows.map { |wf| wf.context.state["RunnerContext"] }.compact

thread = Thread.new do
Runner.docker_runner.wait(runner_contexts)
wakeup.set
end

thread.kill unless wakeup.wait(wait_until - Time.now.utc)
thread.join

logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
end
Expand Down Expand Up @@ -104,6 +113,10 @@ def current_state
@states_by_name[context.state_name]
end

def wait_until
current_state&.wait_until
end

private

def step_next
Expand Down
4 changes: 4 additions & 0 deletions lib/floe/workflow/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def output(_runner_context)
def cleanup(_runner_context)
raise NotImplementedError, "Must be implemented in a subclass"
end

def wait(_refs, timeout: nil)
raise NotImplementedError, "Must be implemented in a subclass"
end
end
end
end
10 changes: 10 additions & 0 deletions lib/floe/workflow/runner/docker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ def output(runner_context)
runner_context["output"] = output
end

def wait(contexts, timeout: 5, since: nil, events: %w[die])
contexts = [contexts] unless contexts.kind_of?(Array)
refs = contexts.map { |ctx| ctx["container_ref"] }

container_filters = refs.map { |container_id| [:filter, "container=#{container_id}"] }
event_filters = events.map { |event| [:filter, "event=#{event}"] }

docker!("events", *container_filters, *event_filters, [:until, (Time.now.utc + 5).to_i], [:format, "{{json .}}"]).output
end

private

attr_reader :network
Expand Down
9 changes: 9 additions & 0 deletions lib/floe/workflow/runner/kubernetes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ def output(runner_context)
runner_context["output"] = output
end

def wait(contexts, timeout: 5)
contexts = [contexts] unless contexts.kind_of?(Array)
refs = contexts.map { |ctx| ctx["container_ref"] }

kubeclient.watch_pods do |notice|
break
end
end

def cleanup(runner_context)
pod, secret = runner_context.values_at("container_ref", "secrets_ref")

Expand Down
19 changes: 19 additions & 0 deletions lib/floe/workflow/runner/podman.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ def output(runner_context)
runner_context["output"] = output
end

def wait(contexts, timeout: 5)
contexts = [contexts] unless contexts.kind_of?(Array)
refs = contexts.map { |ctx| ctx["container_ref"] }

until_timestamp = (Time.now.utc + timeout).to_i
container_filters = refs.map { |container_id| [:filter, "container=#{container_id}"] }
event_filters = events.map { |event| [:filter, "event=#{event}"] }

params = [
container_filters,
event_filters,
[:until, until_timestamp],
[:format, "{{json .}}"]
]
params << [:since, since.to_i] if since

podman!("events", *params).output
end

private

def run_container(image, env, secret)
Expand Down
6 changes: 6 additions & 0 deletions lib/floe/workflow/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ def finished?
context.state.key?("FinishedTime")
end

def wait_until
return nil unless context.state.key?("WaitUntil")

Time.parse(context.state["WaitUntil"])
end

private

def wait(seconds: nil, time: nil)
Expand Down
26 changes: 26 additions & 0 deletions spec/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,30 @@
end
end
end

describe ".wait" do
it "with two ready workflows" do
workflow_1 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}})
workflow_2 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}})

expect(described_class.wait([workflow_1, workflow_2])).to include(workflow_1, workflow_2)
end

it "with one ready workflow and one that would block" do
workflow_1 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}})
workflow_2 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Wait", "Seconds" => 10, "End" => true}})

workflow_1.step_nonblock
workflow_2.step_nonblock

expect(described_class.wait([workflow_1, workflow_2])).to eq([workflow_1])
end

it "with a workflow that would block for 10 seconds" do
workflow = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Wait", "Seconds" => 1, "End" => true}})
workflow.step_nonblock

described_class.wait(workflow)
end
end
end

0 comments on commit 02f23f5

Please sign in to comment.