Skip to content

Commit

Permalink
Heartbeat with server time, not worker time
Browse files Browse the repository at this point in the history
  • Loading branch information
fw42 committed Jun 30, 2016
1 parent 160ac19 commit bfd6b5a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
10 changes: 8 additions & 2 deletions lib/resque/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -484,21 +484,27 @@ def self.all_heartbeats
def self.all_workers_with_expired_heartbeats
workers = Worker.all
heartbeats = Worker.all_heartbeats
now = server_time

workers.select do |worker|
id = worker.to_s
heartbeat = heartbeats[id]

if heartbeat
seconds_since_heartbeat = (Time.now - Time.parse(heartbeat)).to_i
seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i
seconds_since_heartbeat > Resque.prune_interval
else
false
end
end
end

def heartbeat!(time = Time.now)
def self.server_time
time, _ = data_store.time
Time.at(time)
end

def heartbeat!(time = self.class.server_time)
redis.hset(WORKER_HEARTBEAT_KEY, to_s, time.iso8601)
end

Expand Down
8 changes: 8 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ def with_pidfile
end
end

def with_fake_time(time)
old_time = Time.fake_time
Time.fake_time = time
yield
ensure
Time.fake_time = old_time
end

def with_background
old_background = ENV["BACKGROUND"]
begin
Expand Down
28 changes: 22 additions & 6 deletions test/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,34 @@ def self.perform
assert_equal 1, Resque.workers.size
end

it "does return returns a valid time when asking for heartbeat" do
it "does return a valid time when asking for heartbeat" do
workerA = Resque::Worker.new(:jobs)
workerA.register_worker
workerA.heartbeat!

assert_instance_of Time, workerA.heartbeat
end

it "does not generate heartbeats that depend on the worker clock, but only on the server clock" do
server_time_before = Resque::Worker.server_time
fake_time = Time.parse("2000-01-01")

with_fake_time(fake_time) do
worker_time = Time.now

workerA = Resque::Worker.new(:jobs)
workerA.register_worker
workerA.heartbeat!

heartbeat_time = workerA.heartbeat
refute_equal heartbeat_time, worker_time

server_time_after = Resque::Worker.server_time
assert server_time_before <= heartbeat_time
assert heartbeat_time <= server_time_after
end
end

it "cleans up dead worker info on start (crash recovery)" do
# first we fake out several dead workers
# 1: matches queue and hostname; gets pruned.
Expand Down Expand Up @@ -856,17 +876,13 @@ def self.perform
messages = StringIO.new
Resque.logger = Logger.new(messages)

begin
require 'time'
with_fake_time(Time.parse("15:44:33 2011-03-02")) do
last_puts = ""
Time.fake_time = Time.parse("15:44:33 2011-03-02")

@worker.very_verbose = true
@worker.log("some log text")

assert_match /\*\* \[15:44:33 2011-03-02\] \d+: some log text/, messages.string
ensure
Time.fake_time = nil
end
end

Expand Down

0 comments on commit bfd6b5a

Please sign in to comment.