Skip to content

Commit

Permalink
Merge pull request #363 from nyaruka/bulk_timeout
Browse files Browse the repository at this point in the history
Add new task to handle bulk session timeouts
  • Loading branch information
rowanseymour authored Nov 19, 2024
2 parents 0c6fd11 + ec06eeb commit a21a7d6
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 49 deletions.
53 changes: 53 additions & 0 deletions core/tasks/timeouts/bulk_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package timeouts

import (
"context"
"fmt"
"time"

"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/core/tasks/handler/ctasks"
"github.com/nyaruka/mailroom/runtime"
)

// TypeBulkTimeout is the type of the task
const TypeBulkTimeout = "bulk_timeout"

func init() {
tasks.RegisterType(TypeBulkTimeout, func() tasks.Task { return &BulkTimeoutTask{} })
}

// BulkTimeoutTask is the payload of the task
type BulkTimeoutTask struct {
Timeouts []*Timeout `json:"timeouts"`
}

func (t *BulkTimeoutTask) Type() string {
return TypeBulkTimeout
}

// Timeout is the maximum amount of time the task can run for
func (t *BulkTimeoutTask) Timeout() time.Duration {
return time.Hour
}

func (t *BulkTimeoutTask) WithAssets() models.Refresh {
return models.RefreshNone
}

// Perform creates the actual task
func (t *BulkTimeoutTask) Perform(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets) error {
rc := rt.RP.Get()
defer rc.Close()

for _, timeout := range t.Timeouts {
err := handler.QueueTask(rc, oa.OrgID(), timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn))
if err != nil {
return fmt.Errorf("error queuing handle task for timeout on session #%d: %w", timeout.SessionID, err)
}
}

return nil
}
36 changes: 36 additions & 0 deletions core/tasks/timeouts/bulk_timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package timeouts_test

import (
"testing"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/mailroom/core/tasks/timeouts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/stretchr/testify/assert"
)

func TestBulkTimeout(t *testing.T) {
_, rt := testsuite.Runtime()
defer testsuite.Reset(testsuite.ResetRedis)

defer dates.SetNowFunc(time.Now)
dates.SetNowFunc(dates.NewFixedNow(time.Date(2024, 11, 15, 13, 59, 0, 0, time.UTC)))

testsuite.QueueBatchTask(t, rt, testdata.Org1, &timeouts.BulkTimeoutTask{
Timeouts: []*timeouts.Timeout{
{SessionID: 123456, ContactID: testdata.Cathy.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 57, 0, 0, time.UTC)},
{SessionID: 234567, ContactID: testdata.Bob.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 58, 0, 0, time.UTC)},
},
})

assert.Equal(t, map[string]int{"bulk_timeout": 1}, testsuite.FlushTasks(t, rt, "batch", "throttled"))

testsuite.AssertContactTasks(t, testdata.Org1, testdata.Cathy, []string{
`{"type":"timeout_event","task":{"session_id":123456,"time":"2024-11-15T13:57:00Z"},"queued_on":"2024-11-15T13:59:00Z"}`,
})
testsuite.AssertContactTasks(t, testdata.Org1, testdata.Bob, []string{
`{"type":"timeout_event","task":{"session_id":234567,"time":"2024-11-15T13:58:00Z"},"queued_on":"2024-11-15T13:59:00Z"}`,
})
}
83 changes: 52 additions & 31 deletions core/tasks/timeouts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package timeouts
import (
"context"
"fmt"
"slices"
"time"

"github.com/nyaruka/mailroom/core/models"
Expand All @@ -14,16 +15,20 @@ import (
)

func init() {
tasks.RegisterCron("sessions_timeouts", newTimeoutsCron())
tasks.RegisterCron("sessions_timeouts", NewTimeoutsCron(10, 100))
}

type timeoutsCron struct {
marker *redisx.IntervalSet
marker *redisx.IntervalSet
bulkThreshold int // use bulk task for any org with this or more timeouts
bulkBatchSize int // number of timeouts to queue in a single bulk task
}

func newTimeoutsCron() tasks.Cron {
func NewTimeoutsCron(bulkThreshold, bulkBatchSize int) tasks.Cron {
return &timeoutsCron{
marker: redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2),
marker: redisx.NewIntervalSet("session_timeouts", time.Hour*24, 2),
bulkThreshold: bulkThreshold,
bulkBatchSize: bulkBatchSize,
}
}

Expand All @@ -35,32 +40,32 @@ func (c *timeoutsCron) AllInstances() bool {
return false
}

// timeoutRuns looks for any runs that have timed out and schedules for them to continue
// TODO: extend lock
func (c *timeoutsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) {
// find all sessions that need to be expired (we exclude IVR runs)
rows, err := rt.DB.QueryxContext(ctx, timedoutSessionsSQL)
rows, err := rt.DB.QueryxContext(ctx, sqlSelectTimedoutSessions)
if err != nil {
return nil, fmt.Errorf("error selecting timed out sessions: %w", err)
}
defer rows.Close()

taskID := func(t *Timeout) string { return fmt.Sprintf("%d:%s", t.SessionID, t.TimeoutOn.Format(time.RFC3339)) }

// scan and organize by org
byOrg := make(map[models.OrgID][]*Timeout, 50)

rc := rt.RP.Get()
defer rc.Close()

numQueued, numDupes := 0, 0
numDupes, numQueuedHandler, numQueuedBulk := 0, 0, 0

// add a timeout task for each run
timeout := &Timeout{}
for rows.Next() {
err := rows.StructScan(timeout)
if err != nil {
timeout := &Timeout{}
if err := rows.StructScan(timeout); err != nil {
return nil, fmt.Errorf("error scanning timeout: %w", err)
}

// check whether we've already queued this
taskID := fmt.Sprintf("%d:%s", timeout.SessionID, timeout.TimeoutOn.Format(time.RFC3339))
queued, err := c.marker.IsMember(rc, taskID)
queued, err := c.marker.IsMember(rc, taskID(timeout))
if err != nil {
return nil, fmt.Errorf("error checking whether task is queued: %w", err)
}
Expand All @@ -71,34 +76,50 @@ func (c *timeoutsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string
continue
}

// ok, queue this task
err = handler.QueueTask(rc, timeout.OrgID, timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn))
if err != nil {
return nil, fmt.Errorf("error adding new handle task: %w", err)
}
byOrg[timeout.OrgID] = append(byOrg[timeout.OrgID], timeout)
}

// and mark it as queued
err = c.marker.Add(rc, taskID)
if err != nil {
return nil, fmt.Errorf("error marking timeout task as queued: %w", err)
for orgID, timeouts := range byOrg {
throttle := len(timeouts) >= c.bulkThreshold

for batch := range slices.Chunk(timeouts, c.bulkBatchSize) {
if throttle {
if err := tasks.Queue(rc, tasks.ThrottledQueue, orgID, &BulkTimeoutTask{Timeouts: batch}, true); err != nil {
return nil, fmt.Errorf("error queuing bulk timeout task to throttle queue: %w", err)
}
numQueuedBulk += len(batch)
}

for _, timeout := range batch {
if !throttle {
err := handler.QueueTask(rc, timeout.OrgID, timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn))
if err != nil {
return nil, fmt.Errorf("error queuing timeout task to handler queue: %w", err)
}
numQueuedHandler++
}

// mark as queued
if err = c.marker.Add(rc, taskID(timeout)); err != nil {
return nil, fmt.Errorf("error marking timeout task as queued: %w", err)
}
}
}

numQueued++
}

return map[string]any{"dupes": numDupes, "queued": numQueued}, nil
return map[string]any{"dupes": numDupes, "queued_handler": numQueuedHandler, "queued_bulk": numQueuedBulk}, nil
}

const timedoutSessionsSQL = `
const sqlSelectTimedoutSessions = `
SELECT id as session_id, org_id, contact_id, timeout_on
FROM flows_flowsession
WHERE status = 'W' AND timeout_on < NOW() AND call_id IS NULL
ORDER BY timeout_on ASC
LIMIT 25000`

type Timeout struct {
SessionID models.SessionID `db:"session_id"`
OrgID models.OrgID `db:"org_id"`
ContactID models.ContactID `db:"contact_id"`
TimeoutOn time.Time `db:"timeout_on"`
SessionID models.SessionID `db:"session_id" json:"session_id"`
OrgID models.OrgID `db:"org_id" json:"-"`
ContactID models.ContactID `db:"contact_id" json:"contact_id"`
TimeoutOn time.Time `db:"timeout_on" json:"timeout_on"`
}
68 changes: 50 additions & 18 deletions core/tasks/timeouts/cron_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package timeouts
package timeouts_test

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/nyaruka/gocommon/i18n"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/flows"
_ "github.com/nyaruka/mailroom/core/handlers"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/core/tasks/timeouts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"

Expand All @@ -22,35 +27,62 @@ func TestTimeouts(t *testing.T) {

defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis)

// need to create a session that has an expired timeout
s1TimeoutOn := time.Now()
// create sessions for Bob and Cathy that have timed out and session for George that has not
s1TimeoutOn := time.Now().Add(-time.Second)
testdata.InsertWaitingSession(rt, testdata.Org1, testdata.Cathy, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s1TimeoutOn)
s2TimeoutOn := time.Now().Add(time.Hour * 24)
testdata.InsertWaitingSession(rt, testdata.Org1, testdata.George, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s2TimeoutOn)
s2TimeoutOn := time.Now().Add(-time.Second * 30)
testdata.InsertWaitingSession(rt, testdata.Org1, testdata.Bob, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s2TimeoutOn)
s3TimeoutOn := time.Now().Add(time.Hour * 24)
testdata.InsertWaitingSession(rt, testdata.Org1, testdata.George, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &s3TimeoutOn)

time.Sleep(10 * time.Millisecond)
// for other org create 6 waiting sessions
for i := range 6 {
c := testdata.InsertContact(rt, testdata.Org2, flows.ContactUUID(uuids.NewV4()), fmt.Sprint(i), i18n.NilLanguage, models.ContactStatusActive)
timeoutOn := time.Now().Add(-time.Second * 10)
testdata.InsertWaitingSession(rt, testdata.Org2, c, models.FlowTypeMessaging, testdata.Favorites, models.NilCallID, time.Now(), time.Now(), false, &timeoutOn)
}

// schedule our timeouts
cron := newTimeoutsCron()
cron := timeouts.NewTimeoutsCron(3, 5)
res, err := cron.Run(ctx, rt)
assert.NoError(t, err)
assert.Equal(t, map[string]any{"dupes": 0, "queued": 1}, res)
assert.Equal(t, map[string]any{"dupes": 0, "queued_handler": 2, "queued_bulk": 6}, res)

// should have created one task
task, err := tasks.HandlerQueue.Pop(rc)
// should have created two handler tasks for org 1
task1, err := tasks.HandlerQueue.Pop(rc)
assert.NoError(t, err)
assert.NotNil(t, task)

// decode the task
eventTask := &handler.HandleContactEventTask{}
err = json.Unmarshal(task.Task, eventTask)
assert.Equal(t, int(testdata.Org1.ID), task1.OwnerID)
assert.Equal(t, "handle_contact_event", task1.Type)
task2, err := tasks.HandlerQueue.Pop(rc)
assert.NoError(t, err)
assert.Equal(t, int(testdata.Org1.ID), task1.OwnerID)
assert.Equal(t, "handle_contact_event", task1.Type)

// assert its the right contact
// decode the tasks to check contacts
eventTask := &handler.HandleContactEventTask{}
jsonx.MustUnmarshal(task1.Task, eventTask)
assert.Equal(t, testdata.Bob.ID, eventTask.ContactID)
eventTask = &handler.HandleContactEventTask{}
jsonx.MustUnmarshal(task2.Task, eventTask)
assert.Equal(t, testdata.Cathy.ID, eventTask.ContactID)

// no other
task, err = tasks.HandlerQueue.Pop(rc)
task, err := tasks.HandlerQueue.Pop(rc)
assert.NoError(t, err)
assert.Nil(t, task)

// should have created two throttled bulk tasks for org 2
task3, err := tasks.ThrottledQueue.Pop(rc)
assert.NoError(t, err)
assert.Equal(t, int(testdata.Org2.ID), task3.OwnerID)
assert.Equal(t, "bulk_timeout", task3.Type)
task4, err := tasks.ThrottledQueue.Pop(rc)
assert.NoError(t, err)
assert.Equal(t, int(testdata.Org2.ID), task4.OwnerID)
assert.Equal(t, "bulk_timeout", task4.Type)

// if task runs again, these timeouts won't be re-queued
res, err = cron.Run(ctx, rt)
assert.NoError(t, err)
assert.Equal(t, map[string]any{"dupes": 8, "queued_handler": 0, "queued_bulk": 0}, res)
}

0 comments on commit a21a7d6

Please sign in to comment.