Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reap stale locks #52

Merged
merged 10 commits into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions dead_pool_reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,30 @@ import (
)

const (
deadTime = 5 * time.Minute
reapPeriod = 10 * time.Minute
reapJitterSecs = 30
deadTime = 5 * time.Minute
reapPeriod = 10 * time.Minute
reapJitterSecs = 30
requeueKeysPerJob = 4
)

type deadPoolReaper struct {
namespace string
pool *redis.Pool
deadTime time.Duration
reapPeriod time.Duration
namespace string
pool *redis.Pool
deadTime time.Duration
reapPeriod time.Duration
curJobTypes []string

stopChan chan struct{}
doneStoppingChan chan struct{}
}

func newDeadPoolReaper(namespace string, pool *redis.Pool) *deadPoolReaper {
func newDeadPoolReaper(namespace string, pool *redis.Pool, curJobTypes []string) *deadPoolReaper {
return &deadPoolReaper{
namespace: namespace,
pool: pool,
deadTime: deadTime,
reapPeriod: reapPeriod,
curJobTypes: curJobTypes,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
}
Expand Down Expand Up @@ -86,33 +89,59 @@ func (r *deadPoolReaper) reap() error {

// Cleanup all dead pools
for deadPoolID, jobTypes := range deadPoolIDs {
lockJobTypes := jobTypes
// if we found jobs from the heartbeat, requeue them and remove the heartbeat
if len(jobTypes) > 0 {
r.requeueInProgressJobs(deadPoolID, jobTypes)
if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil {
return err
}
} else {
// try to clean up locks for the current set of jobs if heartbeat was not found
lockJobTypes = r.curJobTypes
}

// Remove dead pool from worker pools set
_, err = conn.Do("SREM", workerPoolsKey, deadPoolID)
if err != nil {
if _, err = conn.Do("SREM", workerPoolsKey, deadPoolID); err != nil {
return err
}
// Cleanup any stale lock info
if err = r.cleanStaleLockInfo(deadPoolID, lockJobTypes); err != nil {
return err
}
}

return nil
}

func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error {
numKeys := len(jobTypes) * 2
redisReapLocksScript := redis.NewScript(numKeys, redisLuaReapStaleLocks)
var scriptArgs = make([]interface{}, 0, numKeys+1) // +1 for argv[1]

for _, jobType := range jobTypes {
scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType))
}
scriptArgs = append(scriptArgs, poolID) // ARGV[1]

conn := r.pool.Get()
defer conn.Close()
if _, err := redisReapLocksScript.Do(conn, scriptArgs...); err != nil {
return err
}

return nil
}

func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error {
numArgs := len(jobTypes) * 2
redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob)
var scriptArgs = make([]interface{}, 0, numArgs)
numKeys := len(jobTypes) * requeueKeysPerJob
redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob)
var scriptArgs = make([]interface{}, 0, numKeys+1)

for _, jobType := range jobTypes {
// pops from in progress, push into job queue and decrement the queue lock
scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType))
scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) // KEYS[1-4 * N]
}
scriptArgs = append(scriptArgs, poolID) // ARGV[1]

conn := r.pool.Get()
defer conn.Close()
Expand Down Expand Up @@ -146,18 +175,17 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) {
deadPools := map[string][]string{}
for _, workerPoolID := range workerPoolIDs {
heartbeatKey := redisKeyHeartbeat(r.namespace, workerPoolID)

// Check that last heartbeat was long enough ago to consider the pool dead
heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at"))
if err == redis.ErrNil {
// dead pool with no heartbeat
// heartbeat expired, save dead pool and use cur set of jobs from reaper
deadPools[workerPoolID] = []string{}
continue
}
if err != nil {
return nil, err
}

// Check that last heartbeat was long enough ago to consider the pool dead
if time.Unix(heartbeatAt, 0).Add(r.deadTime).After(time.Now()) {
continue
}
Expand Down
88 changes: 79 additions & 9 deletions dead_pool_reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool)
reaper := newDeadPoolReaper(ns, pool, []string{})
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools)
Expand All @@ -57,6 +57,8 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)
_, err = conn.Do("incr", redisKeyJobsLock(ns, "type1"))
assert.NoError(t, err)
_, err = conn.Do("hincrby", redisKeyJobsLockInfo(ns, "type1"), "2", 1) // worker pool 2 has lock
assert.NoError(t, err)

// Ensure 0 jobs in jobs queue
jobsCount, err := redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1")))
Expand All @@ -82,10 +84,10 @@ func TestDeadPoolReaper(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)

// Lock count should get decremented
lockCount, err := redis.Int(conn.Do("get", redisKeyJobsLock(ns, "type1")))
assert.NoError(t, err)
assert.Equal(t, 0, lockCount)
// Locks should get cleaned up
assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1")))
v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), "2")
assert.Nil(t, v)
}

func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
Expand All @@ -106,14 +108,23 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
assert.NoError(t, err)
err = conn.Send("SADD", workerPoolsKey, "3")
assert.NoError(t, err)
// stale lock info
err = conn.Send("SET", redisKeyJobsLock(ns, "type1"), 3)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "1", 1)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "2", 1)
assert.NoError(t, err)
err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "3", 1)
assert.NoError(t, err)
err = conn.Flush()
assert.NoError(t, err)

// Test getting dead pool ids
reaper := newDeadPoolReaper(ns, pool)
reaper := newDeadPoolReaper(ns, pool, []string{"type1"})
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}})
assert.Equal(t, map[string][]string{"1": {}, "2": {}, "3": {}}, deadPools)

// Test requeueing jobs
_, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo")
Expand Down Expand Up @@ -152,6 +163,13 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) {
jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns)))
assert.NoError(t, err)
assert.Equal(t, 0, jobsCount)

// Stale lock info was cleaned up using reap.curJobTypes
assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1")))
for _, poolID := range []string{"1", "2", "3"} {
v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), poolID)
assert.Nil(t, v)
}
}

func TestDeadPoolReaperNoJobTypes(t *testing.T) {
Expand Down Expand Up @@ -186,7 +204,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) {
assert.NoError(t, err)

// Test getting dead pool
reaper := newDeadPoolReaper(ns, pool)
reaper := newDeadPoolReaper(ns, pool, []string{})
deadPools, err := reaper.findDeadPools()
assert.NoError(t, err)
assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools)
Expand Down Expand Up @@ -261,7 +279,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {

// setup a worker pool and start the reaper, which should restart the stale job above
wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1})
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool)
wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"})
wp.deadPoolReaper.deadTime = expectedDeadTime
wp.deadPoolReaper.start()
// provide some initialization time
Expand All @@ -273,3 +291,55 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) {
staleHeart.stop()
wp.deadPoolReaper.stop()
}

func TestDeadPoolReaperCleanStaleLocks(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

conn := pool.Get()
defer conn.Close()
job1, job2 := "type1", "type2"
jobNames := []string{job1, job2}
workerPoolID1, workerPoolID2 := "1", "2"
lock1 := redisKeyJobsLock(ns, job1)
lock2 := redisKeyJobsLock(ns, job2)
lockInfo1 := redisKeyJobsLockInfo(ns, job1)
lockInfo2 := redisKeyJobsLockInfo(ns, job2)

// Create redis data
var err error
err = conn.Send("SET", lock1, 3)
assert.NoError(t, err)
err = conn.Send("SET", lock2, 1)
assert.NoError(t, err)
err = conn.Send("HSET", lockInfo1, workerPoolID1, 1) // workerPoolID1 holds 1 lock on job1
assert.NoError(t, err)
err = conn.Send("HSET", lockInfo1, workerPoolID2, 2) // workerPoolID2 holds 2 locks on job1
assert.NoError(t, err)
err = conn.Send("HSET", lockInfo2, workerPoolID2, 2) // test that we don't go below 0 on job2 lock
assert.NoError(t, err)
err = conn.Flush()
assert.NoError(t, err)

reaper := newDeadPoolReaper(ns, pool, jobNames)
// clean lock info for workerPoolID1
reaper.cleanStaleLockInfo(workerPoolID1, jobNames)
assert.NoError(t, err)
assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1
assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged
v, _ := conn.Do("HGET", lockInfo1, workerPoolID1) // workerPoolID1 removed from job1's lock info
assert.Nil(t, v)

// now clean lock info for workerPoolID2
reaper.cleanStaleLockInfo(workerPoolID2, jobNames)
assert.NoError(t, err)
// both locks should be at 0
assert.EqualValues(t, 0, getInt64(pool, lock1))
assert.EqualValues(t, 0, getInt64(pool, lock2))
// worker pool ID 2 removed from both lock info hashes
v, err = conn.Do("HGET", lockInfo1, workerPoolID2)
assert.Nil(t, v)
v, err = conn.Do("HGET", lockInfo2, workerPoolID2)
assert.Nil(t, v)
}
20 changes: 14 additions & 6 deletions priority_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@ type sampleItem struct {
priority uint

// payload:
redisJobs string
redisJobsInProg string
redisJobs string
redisJobsInProg string
redisJobsPaused string
redisJobsLock string
redisJobsLockInfo string
redisJobsMaxConcurrency string
}

func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg string) {
func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg, redisJobsPaused, redisJobsLock, redisJobsLockInfo, redisJobsMaxConcurrency string) {
sample := sampleItem{
priority: priority,
redisJobs: redisJobs,
redisJobsInProg: redisJobsInProg,
priority: priority,
redisJobs: redisJobs,
redisJobsInProg: redisJobsInProg,
redisJobsPaused: redisJobsPaused,
redisJobsLock: redisJobsLock,
redisJobsLockInfo: redisJobsLockInfo,
redisJobsMaxConcurrency: redisJobsMaxConcurrency,
}
s.samples = append(s.samples, sample)
s.sum += priority
Expand Down
15 changes: 11 additions & 4 deletions priority_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (

func TestPrioritySampler(t *testing.T) {
ps := prioritySampler{}
ps.add(5, "jobs.5", "jobsinprog.5")
ps.add(2, "jobs.2a", "jobsinprog.2a")
ps.add(1, "jobs.1b", "jobsinprog.1b")

ps.add(5, "jobs.5", "jobsinprog.5", "jobspaused.5", "jobslock.5", "jobslockinfo.5", "jobsconcurrency.5")
ps.add(2, "jobs.2a", "jobsinprog.2a", "jobspaused.2a", "jobslock.2a", "jobslockinfo.2a", "jobsconcurrency.2a")
ps.add(1, "jobs.1b", "jobsinprog.1b", "jobspaused.1b", "jobslock.1b", "jobslockinfo.1b", "jobsconcurrency.1b")

var c5 = 0
var c2 = 0
Expand Down Expand Up @@ -41,7 +42,13 @@ func TestPrioritySampler(t *testing.T) {
func BenchmarkPrioritySampler(b *testing.B) {
ps := prioritySampler{}
for i := 0; i < 200; i++ {
ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i))
ps.add(uint(i)+1,
"jobs."+fmt.Sprint(i),
"jobsinprog."+fmt.Sprint(i),
"jobspaused."+fmt.Sprint(i),
"jobslock."+fmt.Sprint(i),
"jobslockinfo."+fmt.Sprint(i),
"jobsmaxconcurrency."+fmt.Sprint(i))
}

b.ResetTimer()
Expand Down
Loading