Skip to content

Commit

Permalink
Merge pull request #20840 from agrare/use_sd_notify_systemd_heartbeating
Browse files Browse the repository at this point in the history
Use systemd-notify for worker heartbeating
  • Loading branch information
jrafanie authored Jan 4, 2021
2 parents d6c77bd + 01da008 commit df00edf
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 9 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ end

group :systemd, :optional => true do
gem "dbus-systemd", "~>1.1.0", :require => false
gem "sd_notify", "~>0.1.0", :require => false
gem "systemd-journal", "~>1.4.2", :require => false
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def drb_dequeue_available?
def heartbeat_message_timeout(message)
if message.msg_timeout
timeout = worker_settings[:poll] + message.msg_timeout
heartbeat_to_file(timeout)
systemd_worker? ? worker.sd_notify_watchdog_usec(timeout) : heartbeat_to_file(timeout)
end
end
end
24 changes: 17 additions & 7 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ def monitor_workers

cleanup_failed_workers

# Monitor all remaining current worker records
miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Push the heartbeat into the database
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
validate_worker(worker)
end
monitor_active_workers

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
end
Expand Down Expand Up @@ -88,6 +82,22 @@ def cleanup_orphaned_worker_rows
end
end

def monitor_active_workers
# When k8s or systemd is operating as the worker monitor then all of the
# worker monitoring (liveness, memory threshold) is handled by those
# systems. Only when workers are run as standalone processes does MiqServer
# have to monitor the workers itself.
return if podified? || systemd?

# Monitor all remaining current worker records
miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Push the heartbeat into the database
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
validate_worker(worker)
end
end

def cleanup_failed_workers
check_not_responding
check_pending_stop
Expand Down
6 changes: 5 additions & 1 deletion app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ class TemporaryFailure < RuntimeError
attr_accessor :last_hb, :worker, :worker_settings
attr_reader :active_roles, :server

delegate :systemd_worker?, :to => :worker

INTERRUPT_SIGNALS = %w[SIGINT SIGTERM].freeze

SAFE_SLEEP_SECONDS = 60
Expand Down Expand Up @@ -141,6 +143,7 @@ def starting_worker_record

def started_worker_record
reload_worker_record
@worker.sd_notify_started if systemd_worker?
@worker.status = "started"
@worker.last_heartbeat = Time.now.utc
@worker.update_spid
Expand Down Expand Up @@ -190,6 +193,7 @@ def update_worker_record_at_exit(exit_code)
@worker.stopped_on = Time.now.utc
@worker.save

@worker.sd_notify_stopping if systemd_worker?
@worker.status_update
@worker.log_status
end
Expand Down Expand Up @@ -287,7 +291,7 @@ def heartbeat
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

heartbeat_to_file
systemd_worker? ? @worker.sd_notify_watchdog : heartbeat_to_file

if config_out_of_date?
_log.info("#{log_prefix} Synchronizing configuration...")
Expand Down
26 changes: 26 additions & 0 deletions app/models/miq_worker/systemd_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def unit_file
Environment=BUNDLER_GROUPS=#{bundler_groups.join(",")}
ExecStart=/bin/bash -lc '#{exec_start}'
Restart=no
Type=notify
Slice=#{slice_name}
UNIT_FILE
end
Expand Down Expand Up @@ -115,6 +116,23 @@ def stop_systemd_unit(mode: "replace")
systemd.StopUnit(unit_name, mode)
end

def sd_notify_started
sd_notify.ready
end

def sd_notify_stopping
sd_notify.stopping
end

def sd_notify_watchdog
sd_notify.watchdog
end

def sd_notify_watchdog_usec(timeout_in_seconds)
usec = timeout_in_seconds * 1_000_000
sd_notify.notify("WATCHDOG_USEC=#{usec}", false)
end

private

def systemd
Expand All @@ -124,6 +142,13 @@ def systemd
end
end

def sd_notify
@sd_notify ||= begin
require "sd_notify"
SdNotify
end
end

def service_base_name
self.class.service_base_name
end
Expand Down Expand Up @@ -166,6 +191,7 @@ def unit_config_file
MemoryHigh=#{worker_settings[:memory_threshold].bytes}
TimeoutStartSec=#{worker_settings[:starting_timeout]}
TimeoutStopSec=#{worker_settings[:stopping_timeout]}
WatchdogSec=#{worker_settings[:heartbeat_timeout]}
#{unit_environment_variables.map { |env_var| "Environment=#{env_var}" }.join("\n")}
UNIT_CONFIG_FILE
end
Expand Down

0 comments on commit df00edf

Please sign in to comment.