Skip to content

Commit

Permalink
Merge pull request #15884 from carbonin/container_workers
Browse files Browse the repository at this point in the history
[REARCH] Container workers
  • Loading branch information
Fryguy authored Feb 16, 2018
2 parents a78bd64 + 7f53a13 commit 38ede59
Show file tree
Hide file tree
Showing 23 changed files with 297 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
class ManageIQ::Providers::BaseManager::MetricsCollectorWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

include PerEmsTypeWorkerMixin

self.required_roles = ["ems_metrics_collector"]

def self.supports_container?
true
end

def self.normalized_type
@normalized_type ||= "ems_metrics_collector_worker"
end
Expand Down
6 changes: 6 additions & 0 deletions app/models/miq_ems_metrics_processor_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class MiqEmsMetricsProcessorWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = ["ems_metrics_processor"]
Expand All @@ -7,4 +9,8 @@ class MiqEmsMetricsProcessorWorker < MiqQueueWorkerBase
def friendly_name
@friendly_name ||= "C&U Metrics Processor"
end

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_event_handler.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
class MiqEventHandler < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = ["event"]
self.default_queue_name = "ems"

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_generic_worker.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
class MiqGenericWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.default_queue_name = "generic"
self.check_for_minimal_role = false
self.workers = -> { MiqServer.minimal_env? ? 1 : worker_settings[:count] }

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_priority_worker.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
class MiqPriorityWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.default_queue_name = "generic"

def self.queue_priority
MiqQueue::HIGH_PRIORITY
end

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_reporting_worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
class MiqReportingWorker < MiqQueueWorkerBase
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = ["reporting"]
self.default_queue_name = "reporting"

def self.supports_container?
true
end
end
6 changes: 6 additions & 0 deletions app/models/miq_schedule_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class MiqScheduleWorker < MiqWorker
include MiqWorker::ReplicaPerWorker

require_nested :Jobs
require_nested :Runner

Expand All @@ -7,4 +9,8 @@ class MiqScheduleWorker < MiqWorker
return MiqServer.minimal_env_options.include?('schedule') ? 1 : 0 if MiqServer.minimal_env?
return 1
}

def self.supports_container?
true
end
end
1 change: 1 addition & 0 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def check_pending_stop(class_name = nil)
end

def check_not_responding(class_name = nil)
return [] if MiqEnvironment::Command.is_container?
processed_workers = []
miq_workers.each do |w|
next unless class_name.nil? || (w.type == class_name)
Expand Down
4 changes: 3 additions & 1 deletion app/models/miq_server/worker_management/monitor/stop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def stop_worker(worker, monitor_status = :waiting_for_stop, monitor_reason = nil
worker_set_monitor_status(w.pid, monitor_status)
worker_set_monitor_reason(w.pid, monitor_reason)

if w.respond_to?(:terminate)
if w.containerized_worker?
w.stop_container
elsif w.respond_to?(:terminate)
w.terminate
else
w.update_attributes(:status => MiqWorker::STATUS_STOPPING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ module MiqServer::WorkerManagement::Monitor::SystemLimits
}

def kill_workers_due_to_resources_exhausted?
return false if MiqEnvironment::Command.is_container?
options = worker_monitor_settings[:kill_algorithm].merge(:type => :kill)
invoke_algorithm(options)
end

def enough_resource_to_start_worker?(worker_class)
return true if MiqEnvironment::Command.is_container?
# HACK, sync_config is done in the server, while this method is called from miq_worker
# This method should move to the worker and the server should pass the settings.
sync_config if worker_monitor_settings.nil? || child_worker_settings.nil?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module MiqServer::WorkerManagement::Monitor::Validation
extend ActiveSupport::Concern

def validate_worker(w)
return true if MiqEnvironment::Command.is_container?
time_threshold = get_time_threshold(w)
restart_interval = get_restart_interval(w)
memory_threshold = get_memory_threshold(w)
Expand Down
13 changes: 13 additions & 0 deletions app/models/miq_ui_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,17 @@ def friendly_name
end

include MiqWebServerWorkerMixin
include MiqWorker::ServiceWorker

def self.supports_container?
true
end

def container_port
3001
end

def container_image_name
"manageiq/manageiq-ui-worker"
end
end
5 changes: 5 additions & 0 deletions app/models/miq_ui_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
class MiqUiWorker::Runner < MiqWorker::Runner
include MiqWebServerRunnerMixin

def prepare
super
MiqApache::Control.start if MiqEnvironment::Command.is_container?
end
end
6 changes: 6 additions & 0 deletions app/models/miq_vim_broker_worker.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class MiqVimBrokerWorker < MiqWorker
include MiqWorker::ReplicaPerWorker

require_nested :Runner

self.required_roles = lambda {
Expand All @@ -19,6 +21,10 @@ class MiqVimBrokerWorker < MiqWorker
return 1
}

def self.supports_container?
true
end

def self.has_required_role?
return false if emses_to_monitor.empty?
super
Expand Down
5 changes: 5 additions & 0 deletions app/models/miq_web_service_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ def friendly_name
end

include MiqWebServerWorkerMixin
include MiqWorker::ServiceWorker

def self.supports_container?
true
end
end
5 changes: 5 additions & 0 deletions app/models/miq_websocket_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ def friendly_name
end

include MiqWebServerWorkerMixin
include MiqWorker::ServiceWorker

def self.supports_container?
true
end
end
35 changes: 30 additions & 5 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'io/wait'

class MiqWorker < ApplicationRecord
include_concern 'ContainerCommon'
include UuidMixin

before_destroy :log_destroy_of_worker_messages
Expand Down Expand Up @@ -251,18 +252,22 @@ def self.log_status(level = :info)
find_current.each { |w| w.log_status(level) }
end

def self.create_worker_record(*params)
def self.init_worker_object(*params)
params = params.first
params = {} unless params.kind_of?(Hash)
params[:queue_name] = default_queue_name unless params.key?(:queue_name) || default_queue_name.nil?
params[:status] = STATUS_CREATING
params[:last_heartbeat] = Time.now.utc

server_scope.create(params)
server_scope.new(params)
end

def self.create_worker_record(*params)
init_worker_object(*params).tap(&:save)
end

def self.start_worker(*params)
w = create_worker_record(*params)
w = containerized_worker? ? init_worker_object(*params) : create_worker_record(*params)
w.start
w
end
Expand Down Expand Up @@ -360,14 +365,32 @@ def queue_name
end
end

def self.supports_container?
false
end

def self.containerized_worker?
MiqEnvironment::Command.is_container? && supports_container?
end

def containerized_worker?
self.class.containerized_worker?
end

def start_runner
if ENV['MIQ_SPAWN_WORKERS'] || !Process.respond_to?(:fork)
start_runner_via_spawn
elsif containerized_worker?
start_runner_via_container
else
start_runner_via_fork
end
end

def start_runner_via_container
create_container_objects
end

def start_runner_via_fork
self.class.before_fork
pid = fork(:cow_friendly => true) do
Expand Down Expand Up @@ -412,10 +435,10 @@ def start_runner_via_spawn

def start
self.pid = start_runner
save
save unless containerized_worker?

msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
MiqEvent.raise_evm_event_queue(miq_server || MiqServer.my_server, "evm_worker_start", :event_details => msg, :type => self.class.name)

_log.info(msg)
self
Expand Down Expand Up @@ -502,6 +525,8 @@ def log_destroy_of_worker_messages
end

def status_update
return if MiqEnvironment::Command.is_container?

begin
pinfo = MiqProcess.processInfo(pid)
rescue Errno::ESRCH
Expand Down
37 changes: 37 additions & 0 deletions app/models/miq_worker/container_common.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require 'kubeclient'

class MiqWorker
module ContainerCommon
extend ActiveSupport::Concern

def configure_worker_deployment(definition, replicas = 0)
definition[:spec][:replicas] = replicas
definition[:spec][:template][:spec][:terminationGracePeriodSeconds] = self.class.worker_settings[:stopping_timeout].seconds

container = definition[:spec][:template][:spec][:containers].first
container[:image] = "#{container_image_name}:#{container_image_tag}"
container[:env] << {:name => "WORKER_CLASS_NAME", :value => self.class.name}
end

def scale_deployment
ContainerOrchestrator.new.scale(worker_deployment_name, self.class.workers_configured_count)
delete_container_objects if self.class.workers_configured_count.zero?
end

def container_image_name
"manageiq/manageiq-base-worker"
end

def container_image_tag
"latest"
end

def worker_deployment_name
@worker_deployment_name ||= begin
deployment_name = abbreviated_class_name.dup.chomp("Worker").sub("Manager", "").sub(/^Miq/, "")
deployment_name << "-#{Array(ems_id).map { |id| ApplicationRecord.split_id(id).last }.join("-")}" if respond_to?(:ems_id)
deployment_name.underscore.dasherize.tr("/", "-")
end
end
end
end
20 changes: 20 additions & 0 deletions app/models/miq_worker/deployment_per_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MiqWorker
module DeploymentPerWorker
extend ActiveSupport::Concern

def create_container_objects
ContainerOrchestrator.new.create_deployment_config(worker_deployment_name) do |definition|
configure_worker_deployment(definition, 1)
definition[:spec][:template][:spec][:containers].first[:env] << {:name => "EMS_IDS", :value => Array.wrap(self.class.ems_id_from_queue_name(queue_name)).join(",")}
end
end

def delete_container_objects
ContainerOrchestrator.new.delete_deployment_config(worker_deployment_name)
end

def stop_container
delete_container_objects
end
end
end
20 changes: 20 additions & 0 deletions app/models/miq_worker/replica_per_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MiqWorker
module ReplicaPerWorker
extend ActiveSupport::Concern

def create_container_objects
ContainerOrchestrator.new.create_deployment_config(worker_deployment_name) do |definition|
configure_worker_deployment(definition)
end
scale_deployment
end

def delete_container_objects
ContainerOrchestrator.new.delete_deployment_config(worker_deployment_name)
end

def stop_container
scale_deployment
end
end
end
Loading

0 comments on commit 38ede59

Please sign in to comment.