Skip to content

Commit

Permalink
PAUSED: Use git resume to continue working.
Browse files Browse the repository at this point in the history
  • Loading branch information
carbonin committed Jan 6, 2020
1 parent 8d5e3fe commit 928af24
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 47 deletions.
2 changes: 1 addition & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def do_work

# Only for file based heartbeating
def heartbeat_message_timeout(message)
if ENV["WORKER_HEARTBEAT_METHOD"] == "file" && message.msg_timeout
if message.msg_timeout
timeout = worker_settings[:poll] + message.msg_timeout
heartbeat_to_file(timeout)
end
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_server/worker_management/dequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def get_queue_message_for_worker(w)
end

def get_queue_message(pid)
update_worker_last_heartbeat(pid)
@workers_lock.synchronize(:SH) do
w = @workers[pid]

Expand Down
17 changes: 0 additions & 17 deletions app/models/miq_server/worker_management/heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ def worker_add_message(pid, item)
end unless @workers_lock.nil?
end

def update_worker_last_heartbeat(worker_pid)
@workers_lock.synchronize(:EX) do
@workers[worker_pid][:last_heartbeat] = Time.now.utc if @workers.key?(worker_pid)
end unless @workers_lock.nil?
end

def register_worker(worker_pid, worker_class, queue_name)
worker_class = worker_class.constantize if worker_class.kind_of?(String)

Expand Down Expand Up @@ -52,7 +46,6 @@ def message_for_worker(wid, message, *args)
worker_set_message(w, message, *args) unless w.nil?
end

# Get the latest heartbeat between the SQL and memory (updated via DRb)
def persist_last_heartbeat(w)
last_heartbeat = workers_last_heartbeat(w)

Expand All @@ -71,16 +64,6 @@ def clean_heartbeat_files
private

def workers_last_heartbeat(w)
ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? workers_last_heartbeat_to_file(w) : workers_last_heartbeat_to_drb(w)
end

def workers_last_heartbeat_to_drb(w)
@workers_lock.synchronize(:SH) do
@workers.fetch_path(w.pid, :last_heartbeat)
end
end

def workers_last_heartbeat_to_file(w)
File.mtime(w.heartbeat_file).utc if File.exist?(w.heartbeat_file)
end
end
6 changes: 1 addition & 5 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def monitor_workers
# Clear the my_server cache so we can detect role and possibly other changes faster
self.class.my_server_clear_cache

resync_needed = sync_needed?
sync_needed?

# Sync the workers after sync'ing the child worker settings
sync_workers
Expand All @@ -32,8 +32,6 @@ def monitor_workers
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
next unless validate_worker(worker)
# Tell the valid workers to sync config if needed
worker_set_message(worker, "sync_config") if resync_needed
end

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
Expand Down Expand Up @@ -150,8 +148,6 @@ def sync_needed?

update_sync_timestamp(@last_sync)
end

resync_needed
end

def set_last_change(key, value)
Expand Down
40 changes: 17 additions & 23 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,8 @@ def do_exit(message = nil, exit_code = 0)
exit exit_code
end

def message_sync_config(*_args)
_log.info("#{log_prefix} Synchronizing configuration...")
sync_config
_log.info("#{log_prefix} Synchronizing configuration complete...")
end

def sync_config
_log.info("#{log_prefix} Synchronizing configuration...")
# Sync roles
@active_roles = MiqServer.my_active_roles(true)
after_sync_active_roles
Expand All @@ -263,6 +258,7 @@ def sync_config
$log.log_hashes(@cfg)

@worker.release_db_connection if @worker.respond_to?(:release_db_connection)
_log.info("#{log_prefix} Synchronizing configuration complete...")
end

def sync_log_level
Expand Down Expand Up @@ -326,7 +322,11 @@ def heartbeat
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb
heartbeat_to_file

sync_config if config_out_of_date?
process_messages_from_server unless MiqEnvironment::Command.is_podified?

@last_hb = now
do_heartbeat_work
rescue SystemExit, SignalException
Expand All @@ -335,41 +335,35 @@ def heartbeat
do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
end

def heartbeat_to_drb
def heartbeat_to_file(timeout = nil)
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?

worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
worker_monitor_drb.update_worker_last_heartbeat(@worker.pid)
timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s)
end

def process_messages_from_server
worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
worker_monitor_drb.worker_get_messages(@worker.pid).each do |msg, *args|
process_message(msg, *args)
end
rescue DRb::DRbError => err
do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1)
do_exit("Error processing messages from MiqServer because #{err.class.name}: #{err.message}", 1)
end

def heartbeat_to_file(timeout = nil)
timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s)

get_messages.each { |msg, *args| process_message(msg, *args) }
end

def get_messages
messages = []
def config_out_of_date?
@my_last_config_change ||= Time.now.utc

last_config_change = server_last_change(:last_config_change)
if last_config_change && last_config_change > @my_last_config_change
_log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}")
messages << ["sync_config"]

@my_last_config_change = last_config_change
return true
end

messages
false
end

def key_store
Expand Down
1 change: 1 addition & 0 deletions lib/workers/bin/run_single_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def all_role_names
$log.info("Starting #{worker.class.name} with runner options #{runner_options}")
worker.class::Runner.new(runner_options).tap(&:setup_sigterm_trap).start
ensure
FileUtils.rm_f(worker.heartbeat_file)
$log.info("Deleting worker record for #{worker.class.name}, id #{worker.id}")
worker.delete
end
Expand Down

0 comments on commit 928af24

Please sign in to comment.