Skip to content

Commit

Permalink
Merge pull request #360 from nyaruka/queue_abstraction
Browse files Browse the repository at this point in the history
Create abstraction for fair queues
  • Loading branch information
rowanseymour authored Nov 14, 2024
2 parents 1fafb6f + 632710f commit 571f401
Show file tree
Hide file tree
Showing 26 changed files with 96 additions and 100 deletions.
3 changes: 1 addition & 2 deletions core/hooks/create_broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// CreateBroadcastsHook is our hook for creating broadcasts
Expand All @@ -34,7 +33,7 @@ func (h *createBroadcastsHook) Apply(ctx context.Context, rt *runtime.Runtime, t
return fmt.Errorf("error creating broadcast: %w", err)
}

err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &msgs.SendBroadcastTask{Broadcast: bcast}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &msgs.SendBroadcastTask{Broadcast: bcast}, false)
if err != nil {
return fmt.Errorf("error queuing broadcast task: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/hooks/create_starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// CreateStartsHook is our hook to fire our scene starts
Expand Down Expand Up @@ -74,7 +73,7 @@ func (h *createStartsHook) Apply(ctx context.Context, rt *runtime.Runtime, tx *s
}
}

err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, oa.OrgID(), &starts.StartFlowTask{FlowStart: start}, false)
if err != nil {
return fmt.Errorf("error queuing flow start: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/campaigns/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/nyaruka/redisx"
)

Expand Down Expand Up @@ -118,7 +117,7 @@ func (c *QueueEventsCron) queueFiresTask(rp *redis.Pool, orgID models.OrgID, tas
rc := rp.Get()
defer rc.Close()

err := tasks.Queue(rc, tasks.BatchQueue, orgID, task, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.BatchQueue, orgID, task, false)
if err != nil {
return fmt.Errorf("error queuing task: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/campaigns/fire_campaign_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/campaigns"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/nyaruka/redisx"
"github.com/nyaruka/redisx/assertredis"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -54,7 +53,7 @@ func TestFireCampaignEvents(t *testing.T) {
CampaignName: campaign.Name,
}

err := tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false)
assert.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand Down
5 changes: 2 additions & 3 deletions core/tasks/handler/handle_contact_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// TypeHandleContactEvent is the task type for flagging that a contact has handler tasks to be handled
Expand Down Expand Up @@ -57,7 +56,7 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim
if len(locks) == 0 {
rc := rt.RP.Get()
defer rc.Close()
err = tasks.Queue(rc, tasks.HandlerQueue, oa.OrgID(), &HandleContactEventTask{ContactID: t.ContactID}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.HandlerQueue, oa.OrgID(), &HandleContactEventTask{ContactID: t.ContactID}, false)
if err != nil {
return fmt.Errorf("error re-adding contact task after failing to get lock: %w", err)
}
Expand Down Expand Up @@ -186,7 +185,7 @@ func TriggerIVRFlow(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID
// queue this to our ivr starter, it will take care of creating the calls then calling back in
rc := rt.RP.Get()
defer rc.Close()
err = tasks.Queue(rc, tasks.BatchQueue, orgID, task, queues.HighPriority)
err = tasks.Queue(rc, tasks.BatchQueue, orgID, task, true)
if err != nil {
return fmt.Errorf("error queuing ivr flow start: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/handler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

// Task is the interface for all contact tasks - tasks which operate on a single contact in real time
Expand Down Expand Up @@ -73,7 +72,7 @@ func queueTask(rc redis.Conn, orgID models.OrgID, contactID models.ContactID, ta
}

// then add a handle task for that contact on our global handler queue to
err = tasks.Queue(rc, tasks.HandlerQueue, orgID, &HandleContactEventTask{ContactID: contactID}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.HandlerQueue, orgID, &HandleContactEventTask{ContactID: contactID}, false)
if err != nil {
return fmt.Errorf("error queuing handle task: %w", err)
}
Expand Down
5 changes: 2 additions & 3 deletions core/tasks/interrupts/interrupt_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -55,7 +54,7 @@ func TestInterruptChannel(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.TwilioChannel.ID).Returns(0)

// queue and perform a task to interrupt the Twilio channel
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.TwilioChannel.ID}, queues.DefaultPriority)
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.TwilioChannel.ID}, false)
testsuite.FlushTasks(t, rt)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(1)
Expand Down Expand Up @@ -83,7 +82,7 @@ func TestInterruptChannel(t *testing.T) {
})

// queue and perform a task to interrupt the Vonage channel
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.VonageChannel.ID}, queues.DefaultPriority)
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.VonageChannel.ID}, false)
testsuite.FlushTasks(t, rt)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and failed_reason = 'R' and channel_id = $1`, testdata.VonageChannel.ID).Returns(6)
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -35,7 +34,7 @@ func TestRetryCallsCron(t *testing.T) {
err := models.InsertFlowStarts(ctx, rt.DB, []*models.FlowStart{start})
require.NoError(t, err)

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

service.callError = nil
Expand Down
7 changes: 3 additions & 4 deletions core/tasks/ivr/start_ivr_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/require"
)

Expand All @@ -41,7 +40,7 @@ func TestIVR(t *testing.T) {

service.callError = fmt.Errorf("unable to create call")

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand All @@ -53,7 +52,7 @@ func TestIVR(t *testing.T) {
service.callError = nil
service.callID = ivr.CallID("call1")

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand All @@ -64,7 +63,7 @@ func TestIVR(t *testing.T) {
service.callError = nil
service.callID = ivr.CallID("call1")

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/msgs/send_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nyaruka/mailroom/core/search"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

const (
Expand Down Expand Up @@ -116,7 +115,7 @@ func createBroadcastBatches(ctx context.Context, rt *runtime.Runtime, oa *models
isLast := (i == len(idBatches)-1)

batch := bcast.CreateBatch(idBatch, isFirst, isLast)
err = tasks.Queue(rc, q, bcast.OrgID, &SendBroadcastBatchTask{BroadcastBatch: batch}, queues.DefaultPriority)
err = tasks.Queue(rc, q, bcast.OrgID, &SendBroadcastBatchTask{BroadcastBatch: batch}, false)
if err != nil {
if i == 0 {
return fmt.Errorf("error queuing broadcast batch: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions core/tasks/msgs/send_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestBroadcastsFromEvents(t *testing.T) {
groups []*assets.GroupReference
contacts []*flows.ContactReference
urns []urns.URN
queue *queues.FairSorted
queue queues.Fair
expectedBatchCount int
expectedMsgCount int
expectedMsgText string
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestBroadcastsFromEvents(t *testing.T) {
bcast, err := models.NewBroadcastFromEvent(ctx, rt.DB, oa, event)
assert.NoError(t, err)

err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &msgs.SendBroadcastTask{Broadcast: bcast}, queues.DefaultPriority)
err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &msgs.SendBroadcastTask{Broadcast: bcast}, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSendBroadcastTask(t *testing.T) {
query string
exclusions models.Exclusions
createdByID models.UserID
queue *queues.FairSorted
queue queues.Fair
expectedBatches int
expectedMsgs map[string]int
}{
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestSendBroadcastTask(t *testing.T) {

task := &msgs.SendBroadcastTask{Broadcast: bcast}

err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/schedules/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/msgs"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

func init() {
Expand Down Expand Up @@ -135,7 +134,7 @@ func (c *schedulesCron) Run(ctx context.Context, rt *runtime.Runtime) (map[strin

// add our task if we have one
if task != nil {
err = tasks.Queue(rc, tasks.BatchQueue, s.OrgID, task, queues.HighPriority)
err = tasks.Queue(rc, tasks.BatchQueue, s.OrgID, task, true)
if err != nil {
log.Error(fmt.Sprintf("error queueing %s task from schedule", task.Type()), "error", err)
}
Expand Down
3 changes: 1 addition & 2 deletions core/tasks/starts/start_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/queues"
)

const (
Expand Down Expand Up @@ -136,7 +135,7 @@ func createFlowStartBatches(ctx context.Context, rt *runtime.Runtime, oa *models
batchTask = &StartFlowBatchTask{FlowStartBatch: batch}
}

err = tasks.Queue(rc, q, start.OrgID, batchTask, queues.DefaultPriority)
err = tasks.Queue(rc, q, start.OrgID, batchTask, false)
if err != nil {
if i == 0 {
return fmt.Errorf("error queuing flow start batch: %w", err)
Expand Down
11 changes: 5 additions & 6 deletions core/tasks/starts/start_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -35,7 +34,7 @@ func TestStartFlowBatchTask(t *testing.T) {
batch2 := start1.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, false, true, 4)

// start the first batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch1}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch1}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -54,7 +53,7 @@ func TestStartFlowBatchTask(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT status FROM flows_flowstart WHERE id = $1`, start1.ID).Returns("S")

// start the second and final batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch2}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch2}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -71,7 +70,7 @@ func TestStartFlowBatchTask(t *testing.T) {
start2Batch2 := start2.CreateBatch([]models.ContactID{testdata.George.ID, testdata.Alexandria.ID}, false, true, 4)

// start the first batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch1}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch1}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -81,7 +80,7 @@ func TestStartFlowBatchTask(t *testing.T) {
rt.DB.MustExec(`UPDATE flows_flowstart SET status = 'I' WHERE id = $1`, start2.ID)

// start the second batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch2}, queues.DefaultPriority)
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch2}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand All @@ -104,7 +103,7 @@ func TestStartFlowBatchTaskNonPersistedStart(t *testing.T) {
batch := start.CreateBatch([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID}, true, true, 2)

// start the first batch...
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch}, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand Down
6 changes: 3 additions & 3 deletions core/tasks/starts/start_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestStartFlowTask(t *testing.T) {
query string
excludeInAFlow bool
excludeStartedPreviously bool
queue *queues.FairSorted
queue queues.Fair
expectedContactCount int
expectedBatchCount int
expectedTotalCount int
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestStartFlowTask(t *testing.T) {
err := models.InsertFlowStarts(ctx, rt.DB, []*models.FlowStart{start})
assert.NoError(t, err)

err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestStartFlowTaskNonPersistedStart(t *testing.T) {
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, testdata.SingleMessage.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID})

err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queues.DefaultPriority)
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)

Expand Down
2 changes: 1 addition & 1 deletion core/tasks/starts/throttle_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func init() {
}

type ThrottleQueueCron struct {
Queue *queues.FairSorted
Queue queues.Fair
}

func (c *ThrottleQueueCron) Next(last time.Time) time.Time {
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/starts/throttle_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestThrottleQueue(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, map[string]any{"paused": 0, "resumed": 0}, res)

queue.Push(rc, "type1", 1, "task1", queues.DefaultPriority)
queue.Push(rc, "type1", 1, "task1", false)

res, err = cron.Run(ctx, rt)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Perform(ctx context.Context, rt *runtime.Runtime, task *queues.Task) error
}

// Queue adds the given task to the given queue
func Queue(rc redis.Conn, q *queues.FairSorted, orgID models.OrgID, task Task, priority queues.Priority) error {
func Queue(rc redis.Conn, q queues.Fair, orgID models.OrgID, task Task, priority bool) error {
return q.Push(rc, task.Type(), int(orgID), task, priority)
}

Expand Down
Loading

0 comments on commit 571f401

Please sign in to comment.