Skip to content

Commit

Permalink
Cron jobs should add themselves to the main mailroom waitgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 16, 2022
1 parent 83f803b commit 7907575
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 18 deletions.
2 changes: 1 addition & 1 deletion core/tasks/analytics/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "stats", time.Second*60, true, reportAnalytics, time.Minute*5)
cron.Start(rt, wg, "stats", time.Second*60, true, reportAnalytics, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/campaigns/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var campaignsMarker = redisx.NewIntervalSet("campaign_event", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "campaign_event", time.Second*60, false, QueueEventFires, time.Minute*5)
cron.Start(rt, wg, "campaign_event", time.Second*60, false, QueueEventFires, time.Minute*5, quit)
return nil
})
}
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ var expirationsMarker = redisx.NewIntervalSet("run_expirations", time.Hour*24, 2

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "run_expirations", time.Minute, false, HandleWaitExpirations, time.Minute*5)
cron.Start(quit, rt, "expire_ivr_calls", time.Minute, false, ExpireVoiceSessions, time.Minute*5)
cron.Start(rt, wg, "run_expirations", time.Minute, false, HandleWaitExpirations, time.Minute*5, quit)
cron.Start(rt, wg, "expire_ivr_calls", time.Minute, false, ExpireVoiceSessions, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/handler/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var retriedMsgs = redisx.NewIntervalSet("retried_msgs", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "retry_msgs", time.Minute*5, false, RetryPendingMsgs, time.Minute*5)
cron.Start(rt, wg, "retry_msgs", time.Minute*5, false, RetryPendingMsgs, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/incidents/end_incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "end_incidents", time.Minute*3, false, EndIncidents, time.Minute*5)
cron.Start(rt, wg, "end_incidents", time.Minute*3, false, EndIncidents, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "retry_ivr_calls", time.Minute, false, RetryCalls, time.Minute*5)
cron.Start(rt, wg, "retry_ivr_calls", time.Minute, false, RetryCalls, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/msgs/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "retry_errored_messages", time.Second*60, false, RetryErroredMessages, time.Minute*5)
cron.Start(rt, wg, "retry_errored_messages", time.Second*60, false, RetryErroredMessages, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/schedules/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "fire_schedules", time.Minute*1, false, checkSchedules, time.Minute*5)
cron.Start(rt, wg, "fire_schedules", time.Minute*1, false, checkSchedules, time.Minute*5, quit)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/timeouts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var marker = redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(func(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "sessions_timeouts", time.Second*60, false, timeoutSessions, time.Minute*5)
cron.Start(rt, wg, "sessions_timeouts", time.Second*60, false, timeoutSessions, time.Minute*5, quit)
return nil
})
}
Expand Down
17 changes: 15 additions & 2 deletions utils/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cron
import (
"context"
"fmt"
"sync"
"time"

"github.com/apex/log"
Expand All @@ -18,7 +19,9 @@ type Function func(context.Context, *runtime.Runtime) error
// lock so that only one process is running at once. Note that across processes
// crons may be called more often than duration as there is no inter-process
// coordination of cron fires. (this might be a worthy addition)
func Start(quit chan bool, rt *runtime.Runtime, name string, interval time.Duration, allInstances bool, cronFunc Function, timeout time.Duration) {
func Start(rt *runtime.Runtime, wg *sync.WaitGroup, name string, interval time.Duration, allInstances bool, cronFunc Function, timeout time.Duration, quit chan bool) {
wg.Add(1) // add ourselves to the wait group

lockName := fmt.Sprintf("lock:%s_lock", name) // for historical reasons...

// for jobs that run on all instances, the lock key is specific to this instance
Expand All @@ -34,7 +37,10 @@ func Start(quit chan bool, rt *runtime.Runtime, name string, interval time.Durat
log := logrus.WithField("cron", name).WithField("lockName", lockName)

go func() {
defer log.Info("cron exiting")
defer func() {
log.Info("cron exiting")
wg.Done()
}()

for {
select {
Expand All @@ -58,16 +64,23 @@ func Start(quit chan bool, rt *runtime.Runtime, name string, interval time.Durat
}

// ok, got the lock, run our cron function
start := time.Now()
err = fireCron(rt, cronFunc, lockName, lock)
if err != nil {
log.WithError(err).Error("error while running cron")
}
elapsed := time.Since(start)

// release our lock
err = locker.Release(rt.RP, lock)
if err != nil {
log.WithError(err).Error("error releasing lock")
}

// if cron too longer than a minute, log
if elapsed > time.Minute {
logrus.WithField("cron", name).WithField("elapsed", elapsed).Error("cron took too long")
}
}

// calculate our next fire time
Expand Down
14 changes: 8 additions & 6 deletions utils/cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cron_test

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -41,13 +42,14 @@ func TestCron(t *testing.T) {
}

fired := 0
wg := &sync.WaitGroup{}
quit := make(chan bool)
running := false

align()

// start a job that takes ~100 ms and runs every 250ms
cron.Start(quit, rt, "test1", time.Millisecond*250, false, createCronFunc(&running, &fired, map[int]time.Duration{}, time.Millisecond*100), time.Minute)
cron.Start(rt, wg, "test1", time.Millisecond*250, false, createCronFunc(&running, &fired, map[int]time.Duration{}, time.Millisecond*100), time.Minute, quit)

// wait a bit, should only have fired three times (initial time + three repeats)
time.Sleep(time.Millisecond * 875) // time for 3 delays between tasks plus half of another delay
Expand All @@ -66,7 +68,7 @@ func TestCron(t *testing.T) {
align()

// simulate the job taking 400ms to run on the second fire, thus skipping the third fire
cron.Start(quit, rt, "test2", time.Millisecond*250, false, createCronFunc(&running, &fired, map[int]time.Duration{1: time.Millisecond * 400}, time.Millisecond*100), time.Minute)
cron.Start(rt, wg, "test2", time.Millisecond*250, false, createCronFunc(&running, &fired, map[int]time.Duration{1: time.Millisecond * 400}, time.Millisecond*100), time.Minute, quit)

time.Sleep(time.Millisecond * 875)
assert.Equal(t, 3, fired)
Expand All @@ -90,8 +92,8 @@ func TestCron(t *testing.T) {

align()

cron.Start(quit, &rt1, "test3", time.Millisecond*250, false, createCronFunc(&running, &fired1, map[int]time.Duration{}, time.Millisecond*100), time.Minute)
cron.Start(quit, &rt2, "test3", time.Millisecond*250, false, createCronFunc(&running, &fired2, map[int]time.Duration{}, time.Millisecond*100), time.Minute)
cron.Start(&rt1, wg, "test3", time.Millisecond*250, false, createCronFunc(&running, &fired1, map[int]time.Duration{}, time.Millisecond*100), time.Minute, quit)
cron.Start(&rt2, wg, "test3", time.Millisecond*250, false, createCronFunc(&running, &fired2, map[int]time.Duration{}, time.Millisecond*100), time.Minute, quit)

// same number of fires as if only a single instance was running it...
time.Sleep(time.Millisecond * 875)
Expand All @@ -108,8 +110,8 @@ func TestCron(t *testing.T) {
align()

// unless we start the cron with allInstances = true
cron.Start(quit, &rt1, "test4", time.Millisecond*250, true, createCronFunc(&running1, &fired1, map[int]time.Duration{}, time.Millisecond*100), time.Minute)
cron.Start(quit, &rt2, "test4", time.Millisecond*250, true, createCronFunc(&running2, &fired2, map[int]time.Duration{}, time.Millisecond*100), time.Minute)
cron.Start(&rt1, wg, "test4", time.Millisecond*250, true, createCronFunc(&running1, &fired1, map[int]time.Duration{}, time.Millisecond*100), time.Minute, quit)
cron.Start(&rt2, wg, "test4", time.Millisecond*250, true, createCronFunc(&running2, &fired2, map[int]time.Duration{}, time.Millisecond*100), time.Minute, quit)

// now both instances fire 4 times
time.Sleep(time.Millisecond * 875)
Expand Down

0 comments on commit 7907575

Please sign in to comment.