Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use systemd-notify for worker heartbeating #20840

Merged
merged 6 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ end

group :systemd, :optional => true do
gem "dbus-systemd", "~>1.1.0", :require => false
gem "sd_notify", "~>0.1.0", :require => false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, "A pure-Ruby implementation of sd_notify(3) that can be used to communicate state changes of Ruby programs to systemd." (no other dependencies and no c extensions 👏 )

gem "systemd-journal", "~>1.4.2", :require => false
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def drb_dequeue_available?
def heartbeat_message_timeout(message)
if message.msg_timeout
timeout = worker_settings[:poll] + message.msg_timeout
heartbeat_to_file(timeout)
systemd_worker? ? worker.sd_notify_watchdog_usec(timeout) : heartbeat_to_file(timeout)
end
end
end
24 changes: 17 additions & 7 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ def monitor_workers

cleanup_failed_workers

# Monitor all remaining current worker records
miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Push the heartbeat into the database
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
validate_worker(worker)
end
monitor_active_workers

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
end
Expand Down Expand Up @@ -84,6 +78,22 @@ def cleanup_orphaned_worker_rows
end
end

def monitor_active_workers
# When k8s or systemd is operating as the worker monitor then all of the
# worker monitoring (liveness, memory threshold) is handled by those
# systems. Only when workers are run as standalone processes does MiqServer
# have to monitor the workers itself.
return if podified? || systemd?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So looking at this method it

  1. Updates the MiqWorker#last_heartbeat from the tmp/guid.hb heartbeat files
  2. Validates the worker heartbeat and memory limits [ref]

After this PR neither of these will be necessary neither on k8s (no access to heartbeat files and mem/cpu limits handled by k8s) nor on systemd (systemd-notify for heartbeating and cgroups for mem/cpu limits)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note, in a lot of places we ask if podified? and if systemd? but we have no way of asking if we're using the third monitor type, process. Ideally here I'd like to say return unless worker_monitor_type_process? since we have three worker monitor types: k8s, systemd, and legacy/process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note 2, I am seeing a few categories of "worker_monitor_types" that we make decisions on:

  1. external_liveness_monitor?
  2. external_readiness_monitor?
  3. external_resource_limit_monitor?
  4. isolated processes: decisions about where a piece of work can run: can't get process information for workers from the server or vice versa in pods.

I'm sure there are others but maybe we can come up with good names for these concerns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, I've been assuming that any "external worker monitor" aka k8s or systemd will take care of all or most of the "every 15 seconds make sure the workers are okay" for us but some may provide most but not all. I'm thinking when we move to a more pluggable worker monitor class these won't be questions that have to be asked but rather methods that are implemented or not and should clean things up nicely (:crossed_fingers:)

Unsure of readiness vs liveness, is readiness the "are there enough resources to start this new worker" check?

Can you expand on "decisions about where a piece of work can run"? Is that "make MiqSever more like k8s and schedule workers across the appliances"? If so yeah that is a big one that I haven't even started to tackle yet, I think this is going to be a fundamental rethink of how we schedule workers (per server vs per region). We "got away with it" on podified because we went to a single MiqServer but with actual appliance VMs maybe the worker counts should be at the region level and divvied up by the master MiqServer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking when we move to a more pluggable worker monitor class these won't be questions that have to be asked but rather methods that are implemented or not and should clean things up nicely

👍

Unsure of readiness vs liveness, is readiness the "are there enough resources to start this new worker" check?

liveness == heartbeat check
readiness == rails server is available (port 3000) for puma based workers

Can you expand on "decisions about where a piece of work can run"?

There are places that have expectations that the server and workers reside on the same process space / filesystem. For example: #20835

The workaround for these has been to make the right process fetch the environmental information for itself instead of doing it for someone else. Generally, it's been places where the assumption was that we needed to run on the miq server. Previously, any worker on that server was fine but doesn't work in pods so it makes sense to make everything queue things for the correct process instead of assuming one process can see other processes. Like this: jrafanie@1c3d8a7

Another example: #20290

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are places that have expectations that the server and workers reside on the same process space / filesystem. For example

Ahh yes okay I see what you mean, basically anything doing "worker management" outside of MiqServer based on the assumption it can do it based on the PID (processInfo(pid) in the first example and kill(pid) in the second)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, and i believe we had a similar change for embedded ansible since that relied on the filesystem having the ansible repo checked out. It might have changed since but the idea was that you can't assume the "server with embedded ansible" can access the locally checked out ansible repo, only the process that's guaranteed to have checked it out. I don't know what to call this but the process isolation of process space (kill/processInfo(pid)) and filesystems (ansible repo as an example) are what I'm describing.


# Monitor all remaining current worker records
miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Push the heartbeat into the database
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
validate_worker(worker)
end
end

def cleanup_failed_workers
check_not_responding
check_pending_stop
Expand Down
6 changes: 5 additions & 1 deletion app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class TemporaryFailure < RuntimeError
attr_accessor :last_hb, :worker, :worker_settings
attr_reader :active_roles, :server

delegate :systemd_worker?, :to => :worker

INTERRUPT_SIGNALS = %w[SIGINT SIGTERM].freeze

SAFE_SLEEP_SECONDS = 60
Expand Down Expand Up @@ -142,6 +144,7 @@ def starting_worker_record

def started_worker_record
reload_worker_record
@worker.sd_notify_started if systemd_worker?
@worker.status = "started"
@worker.last_heartbeat = Time.now.utc
@worker.update_spid
Expand Down Expand Up @@ -191,6 +194,7 @@ def update_worker_record_at_exit(exit_code)
@worker.stopped_on = Time.now.utc
@worker.save

@worker.sd_notify_stopping if systemd_worker?
@worker.status_update
@worker.log_status
end
Expand Down Expand Up @@ -288,7 +292,7 @@ def heartbeat
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

heartbeat_to_file
systemd_worker? ? @worker.sd_notify_watchdog : heartbeat_to_file

if config_out_of_date?
_log.info("#{log_prefix} Synchronizing configuration...")
Expand Down
26 changes: 26 additions & 0 deletions app/models/miq_worker/systemd_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def unit_file
Environment=BUNDLER_GROUPS=#{bundler_groups.join(",")}
ExecStart=/bin/bash -lc '#{exec_start}'
Restart=no
Type=notify
Slice=#{slice_name}
UNIT_FILE
end
Expand Down Expand Up @@ -115,6 +116,23 @@ def stop_systemd_unit(mode: "replace")
systemd.StopUnit(unit_name, mode)
end

def sd_notify_started
sd_notify.ready
end

def sd_notify_stopping
sd_notify.stopping
end

def sd_notify_watchdog
sd_notify.watchdog
end

def sd_notify_watchdog_usec(timeout_in_seconds)
usec = timeout_in_seconds * 1_000_000
sd_notify.notify("WATCHDOG_USEC=#{usec}", false)
end

private

def systemd
Expand All @@ -124,6 +142,13 @@ def systemd
end
end

def sd_notify
@sd_notify ||= begin
require "sd_notify"
SdNotify
end
end

def service_base_name
self.class.service_base_name
end
Expand Down Expand Up @@ -166,6 +191,7 @@ def unit_config_file
MemoryHigh=#{worker_settings[:memory_threshold].bytes}
TimeoutStartSec=#{worker_settings[:starting_timeout]}
TimeoutStopSec=#{worker_settings[:stopping_timeout]}
WatchdogSec=#{worker_settings[:heartbeat_timeout]}
#{unit_environment_variables.map { |env_var| "Environment=#{env_var}" }.join("\n")}
UNIT_CONFIG_FILE
end
Expand Down