Skip to content

Commit

Permalink
Merge pull request #603 from nyaruka/better_cron
Browse files Browse the repository at this point in the history
Wait for crons on shutdown
  • Loading branch information
rowanseymour authored Mar 21, 2022
2 parents 4a1593a + b04560c commit a7fc6df
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 187 deletions.
2 changes: 1 addition & 1 deletion cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {

// handleSignals takes care of trapping quit, interrupt or terminate signals and doing the right thing
func handleSignals(mr *mailroom.Mailroom) {
sigs := make(chan os.Signal)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

for {
Expand Down
16 changes: 1 addition & 15 deletions core/tasks/analytics/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,17 @@ package analytics

import (
"context"
"sync"
"time"

"github.com/nyaruka/librato"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(StartAnalyticsCron)
}

// StartAnalyticsCron starts our cron job of posting stats every minute
func StartAnalyticsCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "stats", time.Second*60, true,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return reportAnalytics(ctx, rt)
},
)
return nil
mailroom.RegisterCron("analytics", time.Second*60, true, reportAnalytics)
}

var (
Expand Down
17 changes: 1 addition & 16 deletions core/tasks/campaigns/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package campaigns
import (
"context"
"fmt"
"sync"
"time"

"github.com/gomodule/redigo/redis"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"

"github.com/pkg/errors"
Expand All @@ -27,20 +25,7 @@ const (
var campaignsMarker = redisx.NewIntervalSet("campaign_event", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(StartCampaignCron)
}

// StartCampaignCron starts our cron job of firing expired campaign events
func StartCampaignCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "campaign_event", time.Second*60, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return QueueEventFires(ctx, rt)
},
)

return nil
mailroom.RegisterCron("campaign_event", time.Second*60, false, QueueEventFires)
}

// QueueEventFires looks for all due campaign event fires and queues them to be started
Expand Down
26 changes: 2 additions & 24 deletions core/tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package expirations
import (
"context"
"fmt"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -24,28 +22,8 @@ const (
var expirationsMarker = redisx.NewIntervalSet("run_expirations", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(StartExpirationCron)
}

// StartExpirationCron starts our cron job of expiring runs every minute
func StartExpirationCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "run_expirations", time.Minute, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return HandleWaitExpirations(ctx, rt)
},
)

cron.Start(quit, rt, "expire_ivr_calls", time.Minute, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return ExpireVoiceSessions(ctx, rt)
},
)

return nil
mailroom.RegisterCron("run_expirations", time.Minute, false, HandleWaitExpirations)
mailroom.RegisterCron("expire_ivr_calls", time.Minute, false, ExpireVoiceSessions)
}

// HandleWaitExpirations handles waiting messaging sessions whose waits have expired, resuming those that can be resumed,
Expand Down
21 changes: 1 addition & 20 deletions core/tasks/handler/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,21 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
retryLock = "retry_msgs"
)

var retriedMsgs = redisx.NewIntervalSet("retried_msgs", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(StartRetryCron)
}

// StartRetryCron starts our cron job of retrying pending incoming messages
func StartRetryCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, retryLock, time.Minute*5, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return RetryPendingMsgs(ctx, rt)
},
)
return nil
mailroom.RegisterCron("retry_msgs", time.Minute*5, false, RetryPendingMsgs)
}

// RetryPendingMsgs looks for any pending msgs older than five minutes and queues them to be handled again
Expand Down
15 changes: 1 addition & 14 deletions core/tasks/incidents/end_incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,19 @@ package incidents
import (
"context"
"fmt"
"sync"
"time"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func init() {
mailroom.AddInitFunction(startEndCron)
}

func startEndCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, "end_incidents", time.Minute*3, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return EndIncidents(ctx, rt)
},
)
return nil
mailroom.RegisterCron("end_incidents", time.Minute*3, false, EndIncidents)
}

// EndIncidents checks open incidents and end any that no longer apply
Expand Down
21 changes: 1 addition & 20 deletions core/tasks/ivr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,18 @@ package ivr

import (
"context"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
retryIVRLock = "retry_ivr_calls"
)

func init() {
mailroom.AddInitFunction(StartIVRCron)
}

// StartIVRCron starts our cron job of retrying errored calls
func StartIVRCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, retryIVRLock, time.Minute, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return RetryCalls(ctx, rt)
},
)

return nil
mailroom.RegisterCron("retry_ivr_calls", time.Minute, false, RetryCalls)
}

// RetryCalls looks for calls that need to be retried and retries them
Expand Down
20 changes: 1 addition & 19 deletions core/tasks/msgs/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,18 @@ package msgs

import (
"context"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
retryMessagesLock = "retry_errored_messages"
)

func init() {
mailroom.AddInitFunction(startCrons)
}

func startCrons(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, retryMessagesLock, time.Second*60, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return RetryErroredMessages(ctx, rt)
},
)

return nil
mailroom.RegisterCron("retry_errored_messages", time.Second*60, false, RetryErroredMessages)
}

func RetryErroredMessages(ctx context.Context, rt *runtime.Runtime) error {
Expand Down
27 changes: 5 additions & 22 deletions core/tasks/schedules/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,26 @@ package schedules

import (
"context"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
scheduleLock = "fire_schedules"
)

func init() {
mailroom.AddInitFunction(StartCheckSchedules)
}

// StartCheckSchedules starts our cron job of firing schedules every minute
func StartCheckSchedules(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, scheduleLock, time.Minute*1, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
// we sleep 1 second since we fire right on the minute and want to make sure to fire
// things that are schedules right at the minute as well (and DB time may be slightly drifted)
time.Sleep(time.Second * 1)
return checkSchedules(ctx, rt)
},
)
return nil
mailroom.RegisterCron("fire_schedules", time.Minute*1, false, checkSchedules)
}

// checkSchedules looks up any expired schedules and fires them, setting the next fire as needed
func checkSchedules(ctx context.Context, rt *runtime.Runtime) error {
// we sleep 1 second since we fire right on the minute and want to make sure to fire
// things that are schedules right at the minute as well (and DB time may be slightly drifted)
time.Sleep(time.Second * 1)

log := logrus.WithField("comp", "schedules_cron")
start := time.Now()

Expand Down
21 changes: 1 addition & 20 deletions core/tasks/timeouts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,21 @@ package timeouts
import (
"context"
"fmt"
"sync"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/cron"
"github.com/nyaruka/redisx"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const (
timeoutLock = "sessions_timeouts"
)

var marker = redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2)

func init() {
mailroom.AddInitFunction(StartTimeoutCron)
}

// StartTimeoutCron starts our cron job of continuing timed out sessions every minute
func StartTimeoutCron(rt *runtime.Runtime, wg *sync.WaitGroup, quit chan bool) error {
cron.Start(quit, rt, timeoutLock, time.Second*60, false,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return timeoutSessions(ctx, rt)
},
)
return nil
mailroom.RegisterCron("sessions_timeouts", time.Second*60, false, timeoutSessions)
}

// timeoutRuns looks for any runs that have timed out and schedules for them to continue
Expand Down
Loading

0 comments on commit a7fc6df

Please sign in to comment.