Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow tests to specify which task queues are flushed #361

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/tasks/campaigns/fire_campaign_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestFireCampaignEvents(t *testing.T) {
err := tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false)
assert.NoError(t, err)

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// and left in redis marker
for _, fid := range fireIDs {
Expand Down
8 changes: 4 additions & 4 deletions core/tasks/campaigns/schedule_campaign_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestScheduleCampaignEvent(t *testing.T) {

// schedule first event...
testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: testdata.RemindersEvent1.ID})
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// cathy has no value for joined and alexandia has a value too far in past, but bob and george will have values...
assertContactFires(t, rt.DB, testdata.RemindersEvent1.ID, map[models.ContactID]time.Time{
Expand All @@ -46,7 +46,7 @@ func TestScheduleCampaignEvent(t *testing.T) {

// schedule second event...
testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: testdata.RemindersEvent2.ID})
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertContactFires(t, rt.DB, testdata.RemindersEvent2.ID, map[models.ContactID]time.Time{
testdata.Bob.ID: time.Date(2030, 1, 1, 0, 10, 0, 0, time.UTC),
Expand All @@ -69,7 +69,7 @@ func TestScheduleCampaignEvent(t *testing.T) {
event3 := testdata.InsertCampaignFlowEvent(rt, testdata.RemindersCampaign, testdata.Favorites, testdata.CreatedOnField, 5, "M")

testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: event3.ID})
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// only cathy is in the group and new enough to have a fire
assertContactFires(t, rt.DB, event3.ID, map[models.ContactID]time.Time{
Expand All @@ -83,7 +83,7 @@ func TestScheduleCampaignEvent(t *testing.T) {
rt.DB.MustExec(`UPDATE contacts_contact SET last_seen_on = '2040-01-01T00:00:00Z' WHERE id = $1`, testdata.Bob.ID)

testsuite.QueueBatchTask(t, rt, testdata.Org1, &campaigns.ScheduleCampaignEventTask{CampaignEventID: event4.ID})
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertContactFires(t, rt.DB, event4.ID, map[models.ContactID]time.Time{
testdata.Bob.ID: time.Date(2040, 1, 2, 0, 0, 0, 0, time.UTC),
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/contacts/import_contact_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func TestImportContactBatch(t *testing.T) {

// perform first batch task...
testsuite.QueueBatchTask(t, rt, testdata.Org1, &contacts.ImportContactBatchTask{ContactImportBatchID: batch1ID})
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// import is still in progress
assertdb.Query(t, rt.DB, `SELECT status FROM contacts_contactimport WHERE id = $1`, importID).Columns(map[string]any{"status": "O"})

// perform second batch task...
testsuite.QueueBatchTask(t, rt, testdata.Org1, &contacts.ImportContactBatchTask{ContactImportBatchID: batch2ID})
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contact WHERE id >= 30000`).Returns(3)
assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contact WHERE name = 'Norbert' AND language = 'eng'`).Returns(1)
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/handler/handle_contact_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestHandleContactEvent(t *testing.T) {
NewContact: false,
})

tasksRan := testsuite.FlushTasks(t, rt)
tasksRan := testsuite.FlushTasks(t, rt, nil)
assert.Equal(t, map[string]int{"handle_contact_event": 2}, tasksRan)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contact WHERE id = $1 AND status = 'S'`, testdata.Cathy.ID).Returns(1)
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/interrupts/interrupt_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestInterruptChannel(t *testing.T) {

// queue and perform a task to interrupt the Twilio channel
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.TwilioChannel.ID}, false)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(1)
assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and failed_reason = 'R' and channel_id = $1`, testdata.VonageChannel.ID).Returns(0)
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestInterruptChannel(t *testing.T) {

// queue and perform a task to interrupt the Vonage channel
tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &interrupts.InterruptChannelTask{ChannelID: testdata.VonageChannel.ID}, false)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and failed_reason = 'R' and channel_id = $1`, testdata.VonageChannel.ID).Returns(6)
assertdb.Query(t, rt.DB, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(7)
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestRetryCallsCron(t *testing.T) {
service.callError = nil
service.callID = ivr.CallID("call1")

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.CallStatusWired, "call1").Returns(1)
Expand Down
6 changes: 3 additions & 3 deletions core/tasks/ivr/start_ivr_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestIVR(t *testing.T) {
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// should have one call in a failed state
assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2`, testdata.Cathy.ID, models.CallStatusFailed).Returns(1)
Expand All @@ -55,7 +55,7 @@ func TestIVR(t *testing.T) {
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.CallStatusWired, "call1").Returns(1)

Expand All @@ -66,7 +66,7 @@ func TestIVR(t *testing.T) {
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND next_attempt IS NOT NULL;`, testdata.Cathy.ID, models.CallStatusQueued).Returns(1)
}
Expand Down
4 changes: 2 additions & 2 deletions core/tasks/msgs/send_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestBroadcastsFromEvents(t *testing.T) {
err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &msgs.SendBroadcastTask{Broadcast: bcast}, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
taskCounts := testsuite.FlushTasks(t, rt, nil)

// assert our count of batches
assert.Equal(t, tc.expectedBatchCount, taskCounts["send_broadcast_batch"], "%d: unexpected batch count", i)
Expand Down Expand Up @@ -291,7 +291,7 @@ func TestSendBroadcastTask(t *testing.T) {
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, task, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
taskCounts := testsuite.FlushTasks(t, rt, nil)

// assert our count of batches
assert.Equal(t, tc.expectedBatches, taskCounts["send_broadcast_batch"], "%d: unexpected batch count", i)
Expand Down
10 changes: 5 additions & 5 deletions core/tasks/starts/start_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestStartFlowBatchTask(t *testing.T) {
// start the first batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch1}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowsession WHERE contact_id = ANY($1)
AND status = 'C' AND responded = FALSE AND org_id = 1 AND call_id IS NULL AND output IS NOT NULL`, pq.Array([]models.ContactID{testdata.Cathy.ID, testdata.Bob.ID})).
Expand All @@ -55,7 +55,7 @@ func TestStartFlowBatchTask(t *testing.T) {
// start the second and final batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch2}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun WHERE start_id = $1`, start1.ID).Returns(4)
assertdb.Query(t, rt.DB, `SELECT status FROM flows_flowstart WHERE id = $1`, start1.ID).Returns("C")
Expand All @@ -72,7 +72,7 @@ func TestStartFlowBatchTask(t *testing.T) {
// start the first batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch1}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun WHERE start_id = $1`, start2.ID).Returns(2)

Expand All @@ -82,7 +82,7 @@ func TestStartFlowBatchTask(t *testing.T) {
// start the second batch...
err = tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: start2Batch2}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// check that second batch didn't create any runs and start status is still interrupted
assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun WHERE start_id = $1`, start2.ID).Returns(2)
Expand All @@ -105,7 +105,7 @@ func TestStartFlowBatchTaskNonPersistedStart(t *testing.T) {
// start the first batch...
err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowBatchTask{FlowStartBatch: batch}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun`).Returns(2)
}
4 changes: 2 additions & 2 deletions core/tasks/starts/start_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestStartFlowTask(t *testing.T) {
err = tasks.Queue(rc, tc.queue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
assert.NoError(t, err)

taskCounts := testsuite.FlushTasks(t, rt)
taskCounts := testsuite.FlushTasks(t, rt, nil)

// assert our count of batches
assert.Equal(t, tc.expectedBatchCount, taskCounts["start_flow_batch"], "%d: unexpected batch count", i)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestStartFlowTaskNonPersistedStart(t *testing.T) {

err := tasks.Queue(rc, tasks.ThrottledQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
assert.NoError(t, err)
testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT count(*) FROM flows_flowrun`).Returns(2)
}
5 changes: 3 additions & 2 deletions testsuite/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/goflow/test"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -49,11 +50,11 @@ func AssertCourierQueues(t *testing.T, expected map[string][]int, errMsg ...any)
}

// AssertContactTasks asserts that the given contact has the given tasks queued for them
func AssertContactTasks(t *testing.T, orgID models.OrgID, contactID models.ContactID, expected []string, msgAndArgs ...any) {
func AssertContactTasks(t *testing.T, org *testdata.Org, contact *testdata.Contact, expected []string, msgAndArgs ...any) {
rc := getRC()
defer rc.Close()

tasks, err := redis.Strings(rc.Do("LRANGE", fmt.Sprintf("c:%d:%d", orgID, contactID), 0, -1))
tasks, err := redis.Strings(rc.Do("LRANGE", fmt.Sprintf("c:%d:%d", org.ID, contact.ID), 0, -1))
require.NoError(t, err)

expectedJSON := jsonx.MustMarshal(expected)
Expand Down
10 changes: 8 additions & 2 deletions testsuite/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testsuite
import (
"context"
"fmt"
"slices"
"testing"

"github.com/gomodule/redigo/redis"
Expand Down Expand Up @@ -60,15 +61,20 @@ func CurrentTasks(t *testing.T, rt *runtime.Runtime, qname string) map[models.Or
return tasks
}

func FlushTasks(t *testing.T, rt *runtime.Runtime) map[string]int {
func FlushTasks(t *testing.T, rt *runtime.Runtime, qnames []string) map[string]int {
rc := rt.RP.Get()
defer rc.Close()

var task *queues.Task
var err error
counts := make(map[string]int)

qs := []queues.Fair{tasks.HandlerQueue, tasks.BatchQueue, tasks.ThrottledQueue}
var qs []queues.Fair
for _, q := range []queues.Fair{tasks.HandlerQueue, tasks.BatchQueue, tasks.ThrottledQueue} {
if len(qnames) == 0 || slices.Contains(qnames, fmt.Sprint(q)[6:]) {
qs = append(qs, q)
}
}

for {
// look for a task in the queues
Expand Down
4 changes: 2 additions & 2 deletions web/ivr/ivr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestTwilioIVR(t *testing.T) {
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

// check our 3 contacts have 3 wired calls
assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestVonageIVR(t *testing.T) {
err = tasks.Queue(rc, tasks.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, false)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)
testsuite.FlushTasks(t, rt, nil)

assertdb.Query(t, rt.DB, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.CallStatusWired, "Call1").Returns(1)
Expand Down
Loading