Skip to content

Commit

Permalink
Add new task to handle bulk session timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 15, 2024
1 parent 4737982 commit 4eaa22e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 4 deletions.
53 changes: 53 additions & 0 deletions core/tasks/timeouts/bulk_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package timeouts

import (
"context"
"fmt"
"time"

"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/handler"
"github.com/nyaruka/mailroom/core/tasks/handler/ctasks"
"github.com/nyaruka/mailroom/runtime"
)

// TypeBulkTimeout is the type of the task
const TypeBulkTimeout = "bulk_timeout"

func init() {
tasks.RegisterType(TypeBulkTimeout, func() tasks.Task { return &BulkTimeoutTask{} })
}

// BulkTimeoutTask is the payload of the task
type BulkTimeoutTask struct {
Timeouts []Timeout `json:"timeouts"`
}

func (t *BulkTimeoutTask) Type() string {
return TypeBulkTimeout
}

// Timeout is the maximum amount of time the task can run for
func (t *BulkTimeoutTask) Timeout() time.Duration {
return time.Hour
}

func (t *BulkTimeoutTask) WithAssets() models.Refresh {
return models.RefreshNone
}

// Perform creates the actual task
func (t *BulkTimeoutTask) Perform(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets) error {
rc := rt.RP.Get()
defer rc.Close()

for _, timeout := range t.Timeouts {
err := handler.QueueTask(rc, oa.OrgID(), timeout.ContactID, ctasks.NewWaitTimeout(timeout.SessionID, timeout.TimeoutOn))
if err != nil {
return fmt.Errorf("error queuing handle task for timeout on session #%d: %w", timeout.SessionID, err)
}
}

return nil
}
36 changes: 36 additions & 0 deletions core/tasks/timeouts/bulk_timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package timeouts_test

import (
"testing"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/mailroom/core/tasks/timeouts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/stretchr/testify/assert"
)

func TestBulkTimeout(t *testing.T) {
_, rt := testsuite.Runtime()
defer testsuite.Reset(testsuite.ResetRedis)

defer dates.SetNowFunc(time.Now)
dates.SetNowFunc(dates.NewFixedNow(time.Date(2024, 11, 15, 13, 59, 0, 0, time.UTC)))

testsuite.QueueBatchTask(t, rt, testdata.Org1, &timeouts.BulkTimeoutTask{
Timeouts: []timeouts.Timeout{
{SessionID: 123456, ContactID: testdata.Cathy.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 57, 0, 0, time.UTC)},
{SessionID: 234567, ContactID: testdata.Bob.ID, TimeoutOn: time.Date(2024, 11, 15, 13, 58, 0, 0, time.UTC)},
},
})

assert.Equal(t, map[string]int{"bulk_timeout": 1}, testsuite.FlushTasks(t, rt, []string{"batch", "throttled"}))

testsuite.AssertContactTasks(t, testdata.Org1, testdata.Cathy, []string{
`{"type":"timeout_event","task":{"session_id":123456,"time":"2024-11-15T13:57:00Z"},"queued_on":"2024-11-15T13:59:00Z"}`,
})
testsuite.AssertContactTasks(t, testdata.Org1, testdata.Bob, []string{
`{"type":"timeout_event","task":{"session_id":234567,"time":"2024-11-15T13:58:00Z"},"queued_on":"2024-11-15T13:59:00Z"}`,
})
}
8 changes: 4 additions & 4 deletions core/tasks/timeouts/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ ORDER BY timeout_on ASC
LIMIT 25000`

type Timeout struct {
SessionID models.SessionID `db:"session_id"`
OrgID models.OrgID `db:"org_id"`
ContactID models.ContactID `db:"contact_id"`
TimeoutOn time.Time `db:"timeout_on"`
SessionID models.SessionID `db:"session_id" json:"session_id"`
OrgID models.OrgID `db:"org_id" json:"-"`
ContactID models.ContactID `db:"contact_id" json:"contact_id"`
TimeoutOn time.Time `db:"timeout_on" json:"timeout_on"`
}

0 comments on commit 4eaa22e

Please sign in to comment.