diff --git a/floe.gemspec b/floe.gemspec index 0ea3b9021..eb1f57b95 100644 --- a/floe.gemspec +++ b/floe.gemspec @@ -30,6 +30,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency "awesome_spawn", "~>1.0" + spec.add_dependency "concurrent-ruby", "~>1.2" spec.add_dependency "jsonpath", "~>1.1" spec.add_dependency "kubeclient", "~>4.7" spec.add_dependency "more_core_extensions" diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 874312b5a..dff62fbaa 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require "concurrent" require "securerandom" require "json" @@ -13,19 +14,27 @@ def load(path_or_io, context = nil, credentials = {}) new(payload, context, credentials) end - def wait(workflows, timeout: 5) + def wait(workflows, timeout: 60) + workflows = [workflows] if workflows.kind_of?(self) logger.info("checking #{workflows.count} workflows...") - start = Time.now.utc - ready = [] + wakeup = Concurrent::Event.new + start = Time.now.utc - loop do - ready = workflows.select(&:step_nonblock_ready?) - break if timeout.zero? || Time.now.utc - start > timeout || !ready.empty? + ready = workflows.select(&:step_nonblock_ready?) + return ready if timeout.zero? || !ready.empty? - sleep(1) + wait_until = workflows.map(&:wait_until).compact.unshift(start + timeout).min + runner_contexts = workflows.map { |wf| wf.context.state["RunnerContext"] }.compact + + thread = Thread.new do + Runner.docker_runner.wait(runner_contexts) + wakeup.set end + thread.kill unless wakeup.wait(wait_until - Time.now.utc) + thread.join + logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready end @@ -104,6 +113,10 @@ def current_state @states_by_name[context.state_name] end + def wait_until + current_state&.wait_until + end + private def step_next diff --git a/lib/floe/workflow/runner.rb b/lib/floe/workflow/runner.rb index b022b6971..0a09bc19e 100644 --- a/lib/floe/workflow/runner.rb +++ b/lib/floe/workflow/runner.rb @@ -55,6 +55,10 @@ def output(_runner_context) def cleanup(_runner_context) raise NotImplementedError, "Must be implemented in a subclass" end + + def wait(_refs, timeout: nil) + raise NotImplementedError, "Must be implemented in a subclass" + end end end end diff --git a/lib/floe/workflow/runner/docker.rb b/lib/floe/workflow/runner/docker.rb index 6dbbd1715..ca253a011 100644 --- a/lib/floe/workflow/runner/docker.rb +++ b/lib/floe/workflow/runner/docker.rb @@ -60,6 +60,16 @@ def output(runner_context) runner_context["output"] = output end + def wait(contexts, timeout: 5, since: nil, events: %w[die]) + contexts = [contexts] unless contexts.kind_of?(Array) + refs = contexts.map { |ctx| ctx["container_ref"] } + + container_filters = refs.map { |container_id| [:filter, "container=#{container_id}"] } + event_filters = events.map { |event| [:filter, "event=#{event}"] } + + docker!("events", *container_filters, *event_filters, [:until, (Time.now.utc + 5).to_i], [:format, "{{json .}}"]).output + end + private attr_reader :network diff --git a/lib/floe/workflow/runner/kubernetes.rb b/lib/floe/workflow/runner/kubernetes.rb index ccc373214..ad3b9cbb5 100644 --- a/lib/floe/workflow/runner/kubernetes.rb +++ b/lib/floe/workflow/runner/kubernetes.rb @@ -78,6 +78,15 @@ def output(runner_context) runner_context["output"] = output end + def wait(contexts, timeout: 5) + contexts = [contexts] unless contexts.kind_of?(Array) + refs = contexts.map { |ctx| ctx["container_ref"] } + + kubeclient.watch_pods do |notice| + break + end + end + def cleanup(runner_context) pod, secret = runner_context.values_at("container_ref", "secrets_ref") diff --git a/lib/floe/workflow/runner/podman.rb b/lib/floe/workflow/runner/podman.rb index 141491e11..a5e5828cd 100644 --- a/lib/floe/workflow/runner/podman.rb +++ b/lib/floe/workflow/runner/podman.rb @@ -71,6 +71,25 @@ def output(runner_context) runner_context["output"] = output end + def wait(contexts, timeout: 5) + contexts = [contexts] unless contexts.kind_of?(Array) + refs = contexts.map { |ctx| ctx["container_ref"] } + + until_timestamp = (Time.now.utc + timeout).to_i + container_filters = refs.map { |container_id| [:filter, "container=#{container_id}"] } + event_filters = events.map { |event| [:filter, "event=#{event}"] } + + params = [ + container_filters, + event_filters, + [:until, until_timestamp], + [:format, "{{json .}}"] + ] + params << [:since, since.to_i] if since + + podman!("events", *params).output + end + private def run_container(image, env, secret) diff --git a/lib/floe/workflow/state.rb b/lib/floe/workflow/state.rb index f32c7d24a..b47112387 100644 --- a/lib/floe/workflow/state.rb +++ b/lib/floe/workflow/state.rb @@ -96,6 +96,12 @@ def finished? context.state.key?("FinishedTime") end + def wait_until + return nil unless context.state.key?("WaitUntil") + + Time.parse(context.state["WaitUntil"]) + end + private def wait(seconds: nil, time: nil) diff --git a/spec/workflow_spec.rb b/spec/workflow_spec.rb index 5b769967b..2808cb518 100644 --- a/spec/workflow_spec.rb +++ b/spec/workflow_spec.rb @@ -283,4 +283,30 @@ end end end + + describe ".wait" do + it "with two ready workflows" do + workflow_1 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}}) + workflow_2 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}}) + + expect(described_class.wait([workflow_1, workflow_2])).to include(workflow_1, workflow_2) + end + + it "with one ready workflow and one that would block" do + workflow_1 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Succeed"}}) + workflow_2 = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Wait", "Seconds" => 10, "End" => true}}) + + workflow_1.step_nonblock + workflow_2.step_nonblock + + expect(described_class.wait([workflow_1, workflow_2])).to eq([workflow_1]) + end + + it "with a workflow that would block for 10 seconds" do + workflow = make_workflow(Floe::Workflow::Context.new(:input => input), {"FirstState" => {"Type" => "Wait", "Seconds" => 1, "End" => true}}) + workflow.step_nonblock + + described_class.wait(workflow) + end + end end