diff --git a/core/tasks/campaigns/fire_campaign_event_test.go b/core/tasks/campaigns/fire_campaign_event_test.go index 3d63e4dc7..da7017643 100644 --- a/core/tasks/campaigns/fire_campaign_event_test.go +++ b/core/tasks/campaigns/fire_campaign_event_test.go @@ -56,7 +56,7 @@ func TestFireCampaignEvents(t *testing.T) { err := tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // and left in redis marker for _, fid := range fireIDs { diff --git a/core/tasks/campaigns/schedule_campaign_event_test.go b/core/tasks/campaigns/schedule_campaign_event_test.go index 5e69be7b1..33449fe26 100644 --- a/core/tasks/campaigns/schedule_campaign_event_test.go +++ b/core/tasks/campaigns/schedule_campaign_event_test.go @@ -36,7 +36,7 @@ func TestScheduleCampaignEvent(t *testing.T) { // schedule first event... testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: testdata.RemindersEvent1.ID}) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // cathy has no value for joined and alexandia has a value too far in past, but bob and george will have values... assertContactFires(t, rt.DB, testdata.RemindersEvent1.ID, map[models.ContactID]time.Time{ @@ -46,7 +46,7 @@ func TestScheduleCampaignEvent(t *testing.T) { // schedule second event... testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: testdata.RemindersEvent2.ID}) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertContactFires(t, rt.DB, testdata.RemindersEvent2.ID, map[models.ContactID]time.Time{ testdata.Bob.ID: time.Date(2030, 1, 1, 0, 10, 0, 0, time.UTC), @@ -69,7 +69,7 @@ func TestScheduleCampaignEvent(t *testing.T) { event3 := testdata.InsertCampaignFlowEvent(rt, testdata.RemindersCampaign, testdata.Favorites, testdata.CreatedOnField, 5, "M") testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: event3.ID}) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // only cathy is in the group and new enough to have a fire assertContactFires(t, rt.DB, event3.ID, map[models.ContactID]time.Time{ @@ -83,7 +83,7 @@ func TestScheduleCampaignEvent(t *testing.T) { rt.DB.MustExec(`UPDATE contacts_contact SET last_seen_on = '2040-01-01T00:00:00Z' WHERE id = $1`, testdata.Bob.ID) testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: event4.ID}) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertContactFires(t, rt.DB, event4.ID, map[models.ContactID]time.Time{ testdata.Bob.ID: time.Date(2040, 1, 2, 0, 0, 0, 0, time.UTC), diff --git a/core/tasks/contacts/import_contact_batch_test.go b/core/tasks/contacts/import_contact_batch_test.go index ded378d9d..8957bccfa 100644 --- a/core/tasks/contacts/import_contact_batch_test.go +++ b/core/tasks/contacts/import_contact_batch_test.go @@ -31,14 +31,14 @@ func TestImportContactBatch(t *testing.T) { // perform first batch task... testsuite.QueueBatchTask(t, rt, testdata.Org1, &contacts.ImportContactBatchTask{ContactImportBatchID: batch1ID}) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // import is still in progress assertdb.Query(t, rt.DB, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]any{"status": "O"}) // perform second batch task... testsuite.QueueBatchTask(t, rt, testdata.Org1, &contacts.ImportContactBatchTask{ContactImportBatchID: batch2ID}) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contact WHERE id >= 30000`).Returns(3) assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`).Returns(1) diff --git a/core/tasks/handler/handle_contact_event_test.go b/core/tasks/handler/handle_contact_event_test.go index 2ad2afd30..e422b1b79 100644 --- a/core/tasks/handler/handle_contact_event_test.go +++ b/core/tasks/handler/handle_contact_event_test.go @@ -36,7 +36,7 @@ func TestHandleContactEvent(t *testing.T) { NewContact: false, }) - tasksRan := testsuite.FlushTasks(t, rt) + tasksRan := testsuite.FlushTasks(t, rt, nil) assert.Equal(t, map[string]int{"handle_contact_event": 2}, tasksRan) assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contact WHERE id = $1 AND status = 'S'`, testdata.Cathy.ID).Returns(1) diff --git a/core/tasks/interrupts/interrupt_channel_test.go b/core/tasks/interrupts/interrupt_channel_test.go index 343dde7a5..209806c6b 100644 --- a/core/tasks/interrupts/interrupt_channel_test.go +++ b/core/tasks/interrupts/interrupt_channel_test.go @@ -55,7 +55,7 @@ func TestInterruptChannel(t *testing.T) { // queue and perform a task to interrupt the Twilio channel tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.TwilioChannel.ID}, false) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(1) assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and failed_reason = 'R' and channel_id = $1`, testdata.VonageChannel.ID).Returns(0) @@ -83,7 +83,7 @@ func TestInterruptChannel(t *testing.T) { // queue and perform a task to interrupt the Vonage channel tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.VonageChannel.ID}, false) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and failed_reason = 'R' and channel_id = $1`, testdata.VonageChannel.ID).Returns(6) assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(7) diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index aa117d75a..9b193838a 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -40,7 +40,7 @@ func TestRetryCallsCron(t *testing.T) { service.callError = nil service.callID = ivr.CallID("call1") - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.CallStatusWired, "call1").Returns(1) diff --git a/core/tasks/ivr/start_ivr_flow_batch_test.go b/core/tasks/ivr/start_ivr_flow_batch_test.go index 441b23391..8a0bd2b10 100644 --- a/core/tasks/ivr/start_ivr_flow_batch_test.go +++ b/core/tasks/ivr/start_ivr_flow_batch_test.go @@ -43,7 +43,7 @@ func TestIVR(t *testing.T) { err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) require.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // should have one call in a failed state assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2`, testdata.Cathy.ID, models.CallStatusFailed).Returns(1) @@ -55,7 +55,7 @@ func TestIVR(t *testing.T) { err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) require.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.CallStatusWired, "call1").Returns(1) @@ -66,7 +66,7 @@ func TestIVR(t *testing.T) { err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) require.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND next_attempt IS NOT NULL;`, testdata.Cathy.ID, models.CallStatusQueued).Returns(1) } diff --git a/core/tasks/msgs/send_broadcast_test.go b/core/tasks/msgs/send_broadcast_test.go index 56a7b2855..6d7b1462a 100644 --- a/core/tasks/msgs/send_broadcast_test.go +++ b/core/tasks/msgs/send_broadcast_test.go @@ -167,7 +167,7 @@ func TestBroadcastsFromEvents(t *testing.T) { err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &msgs.SendBroadcastTask{Broadcast: bcast}, false) assert.NoError(t, err) - taskCounts := testsuite.FlushTasks(t, rt) + taskCounts := testsuite.FlushTasks(t, rt, nil) // assert our count of batches assert.Equal(t, tc.expectedBatchCount, taskCounts["send_broadcast_batch"], "%d: unexpected batch count", i) @@ -291,7 +291,7 @@ func TestSendBroadcastTask(t *testing.T) { err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false) assert.NoError(t, err) - taskCounts := testsuite.FlushTasks(t, rt) + taskCounts := testsuite.FlushTasks(t, rt, nil) // assert our count of batches assert.Equal(t, tc.expectedBatches, taskCounts["send_broadcast_batch"], "%d: unexpected batch count", i) diff --git a/core/tasks/starts/start_flow_batch_test.go b/core/tasks/starts/start_flow_batch_test.go index eb6fb5236..9089d5274 100644 --- a/core/tasks/starts/start_flow_batch_test.go +++ b/core/tasks/starts/start_flow_batch_test.go @@ -36,7 +36,7 @@ func TestStartFlowBatchTask(t *testing.T) { // start the first batch... err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch1}, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowsession WHERE contact_id = ANY($1) AND status = 'C' AND responded = FALSE AND org_id = 1 AND call_id IS NULL AND output IS NOT NULL`, pq.Array([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID})). @@ -55,7 +55,7 @@ func TestStartFlowBatchTask(t *testing.T) { // start the second and final batch... err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch2}, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun WHERE start_id = $1`, start1.ID).Returns(4) assertdb.Query(t, rt.DB, `SELECT status FROM flows_flowstart WHERE id = $1`, start1.ID).Returns("C") @@ -72,7 +72,7 @@ func TestStartFlowBatchTask(t *testing.T) { // start the first batch... err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch1}, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun WHERE start_id = $1`, start2.ID).Returns(2) @@ -82,7 +82,7 @@ func TestStartFlowBatchTask(t *testing.T) { // start the second batch... err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch2}, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // check that second batch didn't create any runs and start status is still interrupted assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun WHERE start_id = $1`, start2.ID).Returns(2) @@ -105,7 +105,7 @@ func TestStartFlowBatchTaskNonPersistedStart(t *testing.T) { // start the first batch... err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch}, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun`).Returns(2) } diff --git a/core/tasks/starts/start_flow_test.go b/core/tasks/starts/start_flow_test.go index b57576fa2..8acf21570 100644 --- a/core/tasks/starts/start_flow_test.go +++ b/core/tasks/starts/start_flow_test.go @@ -232,7 +232,7 @@ func TestStartFlowTask(t *testing.T) { err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) assert.NoError(t, err) - taskCounts := testsuite.FlushTasks(t, rt) + taskCounts := testsuite.FlushTasks(t, rt, nil) // assert our count of batches assert.Equal(t, tc.expectedBatchCount, taskCounts["start_flow_batch"], "%d: unexpected batch count", i) @@ -268,7 +268,7 @@ func TestStartFlowTaskNonPersistedStart(t *testing.T) { err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) assert.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun`).Returns(2) } diff --git a/testsuite/assert.go b/testsuite/assert.go index e7e7197a4..337b3db30 100644 --- a/testsuite/assert.go +++ b/testsuite/assert.go @@ -9,6 +9,7 @@ import ( "github.com/nyaruka/gocommon/jsonx" "github.com/nyaruka/goflow/test" "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/testsuite/testdata" "github.com/nyaruka/mailroom/utils/queues" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -49,11 +50,11 @@ func AssertCourierQueues(t *testing.T, expected map[string][]int, errMsg ...any) } // AssertContactTasks asserts that the given contact has the given tasks queued for them -func AssertContactTasks(t *testing.T, orgID models.OrgID, contactID models.ContactID, expected []string, msgAndArgs ...any) { +func AssertContactTasks(t *testing.T, org *testdata.Org, contact *testdata.Contact, expected []string, msgAndArgs ...any) { rc := getRC() defer rc.Close() - tasks, err := redis.Strings(rc.Do("LRANGE", fmt.Sprintf("c:%d:%d", orgID, contactID), 0, -1)) + tasks, err := redis.Strings(rc.Do("LRANGE", fmt.Sprintf("c:%d:%d", org.ID, contact.ID), 0, -1)) require.NoError(t, err) expectedJSON := jsonx.MustMarshal(expected) diff --git a/testsuite/tasks.go b/testsuite/tasks.go index c0570d45c..23d542cff 100644 --- a/testsuite/tasks.go +++ b/testsuite/tasks.go @@ -3,6 +3,7 @@ package testsuite import ( "context" "fmt" + "slices" "testing" "github.com/gomodule/redigo/redis" @@ -60,7 +61,7 @@ func CurrentTasks(t *testing.T, rt *runtime.Runtime, qname string) map[models.Or return tasks } -func FlushTasks(t *testing.T, rt *runtime.Runtime) map[string]int { +func FlushTasks(t *testing.T, rt *runtime.Runtime, qnames []string) map[string]int { rc := rt.RP.Get() defer rc.Close() @@ -68,7 +69,12 @@ func FlushTasks(t *testing.T, rt *runtime.Runtime) map[string]int { var err error counts := make(map[string]int) - qs := []queues.Fair{tasks.HandlerQueue, tasks.BatchQueue, tasks.ThrottledQueue} + var qs []queues.Fair + for _, q := range []queues.Fair{tasks.HandlerQueue, tasks.BatchQueue, tasks.ThrottledQueue} { + if len(qnames) == 0 || slices.Contains(qnames, fmt.Sprint(q)[6:]) { + qs = append(qs, q) + } + } for { // look for a task in the queues diff --git a/web/ivr/ivr_test.go b/web/ivr/ivr_test.go index 23d63a056..a4fea5f29 100644 --- a/web/ivr/ivr_test.go +++ b/web/ivr/ivr_test.go @@ -105,7 +105,7 @@ func TestTwilioIVR(t *testing.T) { err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) require.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) // check our 3 contacts have 3 wired calls assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`, @@ -413,7 +413,7 @@ func TestVonageIVR(t *testing.T) { err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false) require.NoError(t, err) - testsuite.FlushTasks(t, rt) + testsuite.FlushTasks(t, rt, nil) assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.CallStatusWired, "Call1").Returns(1)