From 5dd3fa709c59907d9de6aafd5e837ffcfbe72d16 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Fri, 15 Nov 2024 18:20:58 -0500 Subject: [PATCH] Rework timeouts cron --- core/tasks/timeouts/bulk_timeout.go | 2 +- core/tasks/timeouts/bulk_timeout_test.go | 2 +- core/tasks/timeouts/cron.go | 75 +++++++++++++++--------- core/tasks/timeouts/cron_test.go | 68 +++++++++++++++------ 4 files changed, 100 insertions(+), 47 deletions(-) diff --git a/core/tasks/timeouts/bulk_timeout.go b/core/tasks/timeouts/bulk_timeout.go index d0cd48603..47f24778d 100644 --- a/core/tasks/timeouts/bulk_timeout.go +++ b/core/tasks/timeouts/bulk_timeout.go @@ -21,7 +21,7 @@ func init() { // BulkTimeoutTask is the payload of the task type BulkTimeoutTask struct { - Timeouts []Timeout `json:"timeouts"` + Timeouts []*Timeout `json:"timeouts"` } func (t *BulkTimeoutTask) Type() string { diff --git a/core/tasks/timeouts/bulk_timeout_test.go b/core/tasks/timeouts/bulk_timeout_test.go index 0ed5d4f66..230b605ea 100644 --- a/core/tasks/timeouts/bulk_timeout_test.go +++ b/core/tasks/timeouts/bulk_timeout_test.go @@ -19,7 +19,7 @@ func TestBulkTimeout(t *testing.T) { dates.SetNowFunc(dates.NewFixedNow(time.Date(2024, 11, 15, 13, 59, 0, 0, time.UTC))) testsuite.QueueBatchTask(t, rt, testdata.Org1, &timeouts.BulkTimeoutTask{ - Timeouts: []timeouts.Timeout{ + Timeouts: []*timeouts.Timeout{ {SessionID: 123456, ContactID: testdata.Cathy.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 57, 0, 0, time.UTC)}, {SessionID: 234567, ContactID: testdata.Bob.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 58, 0, 0, time.UTC)}, }, diff --git a/core/tasks/timeouts/cron.go b/core/tasks/timeouts/cron.go index 93a5bd1f4..e94e9f670 100644 --- a/core/tasks/timeouts/cron.go +++ b/core/tasks/timeouts/cron.go @@ -3,6 +3,7 @@ package timeouts import ( "context" "fmt" + "slices" "time" "github.com/nyaruka/mailroom/core/models" @@ -14,16 +15,20 @@ import ( ) func init() { - tasks.RegisterCron("sessions_timeouts", newTimeoutsCron()) + tasks.RegisterCron("sessions_timeouts", NewTimeoutsCron(100, 100)) } type timeoutsCron struct { - marker *redisx.IntervalSet + marker *redisx.IntervalSet + bulkThreshold int // use bulk task for any org with this or more timeouts + bulkBatchSize int // number of timeouts to queue in a single bulk task } -func newTimeoutsCron() tasks.Cron { +func NewTimeoutsCron(bulkThreshold, bulkBatchSize int) tasks.Cron { return &timeoutsCron{ - marker: redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2), + marker: redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2), + bulkThreshold: bulkThreshold, + bulkBatchSize: bulkBatchSize, } } @@ -35,32 +40,32 @@ func (c *timeoutsCron) AllInstances() bool { return false } -// timeoutRuns looks for any runs that have timed out and schedules for them to continue -// TODO: extend lock func (c *timeoutsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) { // find all sessions that need to be expired (we exclude IVR runs) - rows, err := rt.DB.QueryxContext(ctx, timedoutSessionsSQL) + rows, err := rt.DB.QueryxContext(ctx, sqlSelectTimedoutSessions) if err != nil { return nil, fmt.Errorf("error selecting timed out sessions: %w", err) } defer rows.Close() + taskID := func(t *Timeout) string { return fmt.Sprintf("%d:%s", t.SessionID, t.TimeoutOn.Format(time.RFC3339)) } + + // scan and organize by org + byOrg := make(map[models.OrgID][]*Timeout, 50) + rc := rt.RP.Get() defer rc.Close() - numQueued, numDupes := 0, 0 + numDupes, numQueuedHandler, numQueuedBulk := 0, 0, 0 - // add a timeout task for each run - timeout := &Timeout{} for rows.Next() { - err := rows.StructScan(timeout) - if err != nil { + timeout := &Timeout{} + if err := rows.StructScan(timeout); err != nil { return nil, fmt.Errorf("error scanning timeout: %w", err) } // check whether we've already queued this - taskID := fmt.Sprintf("%d:%s", timeout.SessionID, timeout.TimeoutOn.Format(time.RFC3339)) - queued, err := c.marker.IsMember(rc, taskID) + queued, err := c.marker.IsMember(rc, taskID(timeout)) if err != nil { return nil, fmt.Errorf("error checking whether task is queued: %w", err) } @@ -71,25 +76,41 @@ func (c *timeoutsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string continue } - // ok, queue this task - err = handler.QueueTask(rc, timeout.OrgID, timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn)) - if err != nil { - return nil, fmt.Errorf("error adding new handle task: %w", err) - } + byOrg[timeout.OrgID] = append(byOrg[timeout.OrgID], timeout) + } - // and mark it as queued - err = c.marker.Add(rc, taskID) - if err != nil { - return nil, fmt.Errorf("error marking timeout task as queued: %w", err) + for orgID, timeouts := range byOrg { + throttle := len(timeouts) >= c.bulkThreshold + + for batch := range slices.Chunk(timeouts, c.bulkBatchSize) { + if throttle { + if err := tasks.Queue(rc, tasks.ThrottledQueue, orgID, &BulkTimeoutTask{Timeouts: batch}, true); err != nil { + return nil, fmt.Errorf("error queuing bulk timeout task to throttle queue: %w", err) + } + numQueuedBulk += len(batch) + } + + for _, timeout := range batch { + if !throttle { + err := handler.QueueTask(rc, timeout.OrgID, timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn)) + if err != nil { + return nil, fmt.Errorf("error queuing timeout task to handler queue: %w", err) + } + numQueuedHandler++ + } + + // mark as queued + if err = c.marker.Add(rc, taskID(timeout)); err != nil { + return nil, fmt.Errorf("error marking timeout task as queued: %w", err) + } + } } - - numQueued++ } - return map[string]any{"dupes": numDupes, "queued": numQueued}, nil + return map[string]any{"dupes": numDupes, "queued_handler": numQueuedHandler, "queued_bulk": numQueuedBulk}, nil } -const timedoutSessionsSQL = ` +const sqlSelectTimedoutSessions = ` SELECT id as session_id, org_id, contact_id, timeout_on FROM flows_flowsession WHERE status = 'W' AND timeout_on < NOW() AND call_id IS NULL diff --git a/core/tasks/timeouts/cron_test.go b/core/tasks/timeouts/cron_test.go index 74baaf985..dd7df4148 100644 --- a/core/tasks/timeouts/cron_test.go +++ b/core/tasks/timeouts/cron_test.go @@ -1,14 +1,19 @@ -package timeouts +package timeouts_test import ( - "encoding/json" + "fmt" "testing" "time" + "github.com/nyaruka/gocommon/i18n" + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/gocommon/uuids" + "github.com/nyaruka/goflow/flows" _ "github.com/nyaruka/mailroom/core/handlers" "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/tasks" "github.com/nyaruka/mailroom/core/tasks/handler" + "github.com/nyaruka/mailroom/core/tasks/timeouts" "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" @@ -22,35 +27,62 @@ func TestTimeouts(t *testing.T) { defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis) - // need to create a session that has an expired timeout - s1TimeoutOn := time.Now() + // create sessions for Bob and Cathy that have timed out and session for George that has not + s1TimeoutOn := time.Now().Add(-time.Second) testdata.InsertWaitingSession(rt, testdata.Org1, testdata.Cathy, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s1TimeoutOn) - s2TimeoutOn := time.Now().Add(time.Hour * 24) - testdata.InsertWaitingSession(rt, testdata.Org1, testdata.George, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s2TimeoutOn) + s2TimeoutOn := time.Now().Add(-time.Second * 30) + testdata.InsertWaitingSession(rt, testdata.Org1, testdata.Bob, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s2TimeoutOn) + s3TimeoutOn := time.Now().Add(time.Hour * 24) + testdata.InsertWaitingSession(rt, testdata.Org1, testdata.George, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s3TimeoutOn) - time.Sleep(10 * time.Millisecond) + // for other org create 6 waiting sessions + for i := range 6 { + c := testdata.InsertContact(rt, testdata.Org2, flows.ContactUUID(uuids.NewV4()), fmt.Sprint(i), i18n.NilLanguage, models.ContactStatusActive) + timeoutOn := time.Now().Add(-time.Second * 10) + testdata.InsertWaitingSession(rt, testdata.Org2, c, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &timeoutOn) + } // schedule our timeouts - cron := newTimeoutsCron() + cron := timeouts.NewTimeoutsCron(3, 5) res, err := cron.Run(ctx, rt) assert.NoError(t, err) - assert.Equal(t, map[string]any{"dupes": 0, "queued": 1}, res) + assert.Equal(t, map[string]any{"dupes": 0, "queued_handler": 2, "queued_bulk": 6}, res) - // should have created one task - task, err := tasks.HandlerQueue.Pop(rc) + // should have created two handler tasks for org 1 + task1, err := tasks.HandlerQueue.Pop(rc) assert.NoError(t, err) - assert.NotNil(t, task) - - // decode the task - eventTask := &handler.HandleContactEventTask{} - err = json.Unmarshal(task.Task, eventTask) + assert.Equal(t, int(testdata.Org1.ID), task1.OwnerID) + assert.Equal(t, "handle_contact_event", task1.Type) + task2, err := tasks.HandlerQueue.Pop(rc) assert.NoError(t, err) + assert.Equal(t, int(testdata.Org1.ID), task1.OwnerID) + assert.Equal(t, "handle_contact_event", task1.Type) - // assert its the right contact + // decode the tasks to check contacts + eventTask := &handler.HandleContactEventTask{} + jsonx.MustUnmarshal(task1.Task, eventTask) + assert.Equal(t, testdata.Bob.ID, eventTask.ContactID) + eventTask = &handler.HandleContactEventTask{} + jsonx.MustUnmarshal(task2.Task, eventTask) assert.Equal(t, testdata.Cathy.ID, eventTask.ContactID) // no other - task, err = tasks.HandlerQueue.Pop(rc) + task, err := tasks.HandlerQueue.Pop(rc) assert.NoError(t, err) assert.Nil(t, task) + + // should have created two throttled bulk tasks for org 2 + task3, err := tasks.ThrottledQueue.Pop(rc) + assert.NoError(t, err) + assert.Equal(t, int(testdata.Org2.ID), task3.OwnerID) + assert.Equal(t, "bulk_timeout", task3.Type) + task4, err := tasks.ThrottledQueue.Pop(rc) + assert.NoError(t, err) + assert.Equal(t, int(testdata.Org2.ID), task4.OwnerID) + assert.Equal(t, "bulk_timeout", task4.Type) + + // if task runs again, these timeouts won't be re-queued + res, err = cron.Run(ctx, rt) + assert.NoError(t, err) + assert.Equal(t, map[string]any{"dupes": 8, "queued_handler": 0, "queued_bulk": 0}, res) }