Skip to content

Commit

Permalink
Merge pull request #33 from nyaruka/bcast_queries
Browse files Browse the repository at this point in the history
Support contact query based broadcasts
  • Loading branch information
rowanseymour authored Feb 10, 2023
2 parents da5260d + 11516c2 commit a1dfd25
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 134 deletions.
16 changes: 7 additions & 9 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,18 +774,16 @@ func FailChannelMessages(ctx context.Context, db Queryer, orgID OrgID, channelID
return nil
}

// MarkBroadcastSent marks the passed in broadcast as sent
// MarkBroadcastSent marks the given broadcast as sent
func MarkBroadcastSent(ctx context.Context, db Queryer, id BroadcastID) error {
// noop if it is a nil id
if id == NilBroadcastID {
return nil
}
_, err := db.ExecContext(ctx, `UPDATE msgs_broadcast SET status = 'S', modified_on = now() WHERE id = $1`, id)
return errors.Wrapf(err, "error marking broadcast #%d as sent", id)
}

// MarkBroadcastFailed marks the given broadcast as failed
func MarkBroadcastFailed(ctx context.Context, db Queryer, id BroadcastID) error {
_, err := db.ExecContext(ctx, `UPDATE msgs_broadcast SET status = 'S', modified_on = now() WHERE id = $1`, id)
if err != nil {
return errors.Wrapf(err, "error setting broadcast with id %d as sent", id)
}
return nil
return errors.Wrapf(err, "error marking broadcast #%d as failed", id)
}

// NilID implementations
Expand Down
20 changes: 7 additions & 13 deletions core/models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ const (
StartStatusFailed = StartStatus("F")
)

// MarkStartComplete sets the status for the passed in flow start
func MarkStartComplete(ctx context.Context, db Queryer, startID StartID) error {
_, err := db.ExecContext(ctx, "UPDATE flows_flowstart SET status = 'C', modified_on = NOW() WHERE id = $1", startID)
if err != nil {
return errors.Wrapf(err, "error setting start as complete")
}
return nil
}

// MarkStartStarted sets the status for the passed in flow start to S and updates the contact count on it
func MarkStartStarted(ctx context.Context, db Queryer, startID StartID, contactCount int, createdContactIDs []ContactID) error {
_, err := db.ExecContext(ctx, "UPDATE flows_flowstart SET status = 'S', contact_count = $2, modified_on = NOW() WHERE id = $1", startID, contactCount)
Expand Down Expand Up @@ -83,13 +74,16 @@ func MarkStartStarted(ctx context.Context, db Queryer, startID StartID, contactC
return nil
}

// MarkStartComplete sets the status for the passed in flow start
func MarkStartComplete(ctx context.Context, db Queryer, startID StartID) error {
_, err := db.ExecContext(ctx, "UPDATE flows_flowstart SET status = 'C', modified_on = NOW() WHERE id = $1", startID)
return errors.Wrapf(err, "error marking flow start as complete")
}

// MarkStartFailed sets the status for the passed in flow start to F
func MarkStartFailed(ctx context.Context, db Queryer, startID StartID) error {
_, err := db.ExecContext(ctx, "UPDATE flows_flowstart SET status = 'F', modified_on = NOW() WHERE id = $1", startID)
if err != nil {
return errors.Wrapf(err, "error setting start as failed")
}
return nil
return errors.Wrapf(err, "error setting flow start as failed")
}

// FlowStartBatch represents a single flow batch that needs to be started
Expand Down
20 changes: 7 additions & 13 deletions core/tasks/ivr/cron_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package ivr_test

import (
"encoding/json"
"testing"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/core/tasks"
ivrtasks "github.com/nyaruka/mailroom/core/tasks/ivr"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRetries(t *testing.T) {
Expand All @@ -32,21 +33,14 @@ func TestRetries(t *testing.T) {
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
err := starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)

// should have one task in our ivr queue
task, err := queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)
batch := &models.FlowStartBatch{}
err = json.Unmarshal(task.Task, batch)
assert.NoError(t, err)
err := tasks.Queue(rc, queue.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queue.DefaultPriority)
require.NoError(t, err)

service.callError = nil
service.callID = ivr.CallID("call1")
err = ivrtasks.HandleFlowStartBatch(ctx, rt, batch)
assert.NoError(t, err)

testsuite.FlushTasks(t, rt)

assertdb.Query(t, db, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`,
testdata.Cathy.ID, models.CallStatusWired, "call1").Returns(1)

Expand Down
37 changes: 19 additions & 18 deletions core/tasks/ivr/start_ivr_flow_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/mailroom/core/ivr"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIVR(t *testing.T) {
ctx, rt, db, rp := testsuite.Get()
_, rt, db, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

Expand All @@ -35,37 +37,36 @@ func TestIVR(t *testing.T) {
start := models.NewFlowStart(testdata.Org1.ID, models.StartTypeTrigger, models.FlowTypeVoice, testdata.IVRFlow.ID).
WithContactIDs([]models.ContactID{testdata.Cathy.ID})

// call our master starter
err := starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)

service.callError = errors.Errorf("unable to create call")

// should have one IVR batch start task
taskCounts := testsuite.FlushTasks(t, rt)
assert.Equal(t, 1, taskCounts["start_ivr_flow_batch"])
err := tasks.Queue(rc, queue.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queue.DefaultPriority)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)

// should have one call in a failed state
assertdb.Query(t, db, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2`, testdata.Cathy.ID, models.CallStatusFailed).Returns(1)

// re-create the batch and try again
// re-queue the start and try again
service.callError = nil
service.callID = ivr.CallID("call1")

err = starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)
err = tasks.Queue(rc, queue.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queue.DefaultPriority)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)

taskCounts = testsuite.FlushTasks(t, rt)
assert.Equal(t, 1, taskCounts["start_ivr_flow_batch"])
assertdb.Query(t, db, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND external_id = $3`, testdata.Cathy.ID, models.CallStatusWired, "call1").Returns(1)

// trying again should put us in a throttled state (queued)
service.callError = nil
service.callID = ivr.CallID("call1")

err = starts.CreateFlowBatches(ctx, rt, start)
assert.NoError(t, err)
err = tasks.Queue(rc, queue.BatchQueue, testdata.Org1.ID, &starts.StartFlowTask{FlowStart: start}, queue.DefaultPriority)
require.NoError(t, err)

testsuite.FlushTasks(t, rt)

taskCounts = testsuite.FlushTasks(t, rt)
assert.Equal(t, 1, taskCounts["start_ivr_flow_batch"])
assertdb.Query(t, db, `SELECT COUNT(*) FROM ivr_call WHERE contact_id = $1 AND status = $2 AND next_attempt IS NOT NULL;`, testdata.Cathy.ID, models.CallStatusQueued).Returns(1)
}

Expand Down
59 changes: 36 additions & 23 deletions core/tasks/msgs/send_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"time"

"github.com/nyaruka/goflow/contactql"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/core/search"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
)

const (
Expand Down Expand Up @@ -40,34 +41,46 @@ func (t *SendBroadcastTask) Timeout() time.Duration {

// Perform handles sending the broadcast by creating batches of broadcast sends for all the unique contacts
func (t *SendBroadcastTask) Perform(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID) error {
oa, err := models.GetOrgAssets(ctx, rt, t.Broadcast.OrgID)
if err != nil {
return errors.Wrapf(err, "error getting org assets")
}
if err := createBroadcastBatches(ctx, rt, t.Broadcast); err != nil {
if t.Broadcast.ID != models.NilBroadcastID {
models.MarkBroadcastFailed(ctx, rt.DB, t.Broadcast.ID)
}

// we are building a set of contact ids, start with the explicit ones
contactIDs := make(map[models.ContactID]bool)
for _, id := range t.Broadcast.ContactIDs {
contactIDs[id] = true
// if error is user created query error.. don't escalate error to sentry
isQueryError, _ := contactql.IsQueryError(err)
if !isQueryError {
return err
}
}

groupContactIDs, err := models.ContactIDsForGroupIDs(ctx, rt.DB, t.Broadcast.GroupIDs)
if err != nil {
return errors.Wrap(err, "error resolving groups to contact ids")
}
return nil
}

for _, id := range groupContactIDs {
contactIDs[id] = true
func createBroadcastBatches(ctx context.Context, rt *runtime.Runtime, bcast *models.Broadcast) error {
oa, err := models.GetOrgAssets(ctx, rt, bcast.OrgID)
if err != nil {
return errors.Wrapf(err, "error getting org assets")
}

// get the contact ids for our URNs
urnMap, err := models.GetOrCreateContactIDsFromURNs(ctx, rt.DB, oa, t.Broadcast.URNs)
contactIDs, _, err := search.ResolveRecipients(ctx, rt, oa, &search.Recipients{
ContactIDs: bcast.ContactIDs,
GroupIDs: bcast.GroupIDs,
URNs: bcast.URNs,
Query: string(bcast.Query),
QueryLimit: -1,
ExcludeGroupIDs: nil,
})
if err != nil {
return errors.Wrap(err, "error resolving URNs to contact ids")
return errors.Wrap(err, "error resolving broadcast recipients")
}

for _, id := range urnMap {
contactIDs[id] = true
// if there are no contacts to send to, mark our broadcast as sent, we are done
if len(contactIDs) == 0 && bcast.ID != models.NilBroadcastID {
err = models.MarkBroadcastSent(ctx, rt.DB, bcast.ID)
if err != nil {
return errors.Wrapf(err, "error marking broadcast as sent")
}
return nil
}

// two or fewer contacts? queue to our handler queue for sending
Expand All @@ -80,12 +93,12 @@ func (t *SendBroadcastTask) Perform(ctx context.Context, rt *runtime.Runtime, or
defer rc.Close()

// create tasks for batches of contacts
idBatches := models.ChunkSlice(maps.Keys(contactIDs), 100)
idBatches := models.ChunkSlice(contactIDs, startBatchSize)
for i, idBatch := range idBatches {
isLast := (i == len(idBatches)-1)

batch := t.Broadcast.CreateBatch(idBatch, isLast)
err = tasks.Queue(rc, q, t.Broadcast.OrgID, &SendBroadcastBatchTask{BroadcastBatch: batch}, queue.DefaultPriority)
batch := bcast.CreateBatch(idBatch, isLast)
err = tasks.Queue(rc, q, bcast.OrgID, &SendBroadcastBatchTask{BroadcastBatch: batch}, queue.DefaultPriority)
if err != nil {
if i == 0 {
return errors.Wrap(err, "error queuing broadcast batch")
Expand Down
2 changes: 1 addition & 1 deletion core/tasks/msgs/send_broadcast_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (t *SendBroadcastBatchTask) Timeout() time.Duration {
func (t *SendBroadcastBatchTask) Perform(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID) error {
// always set our broadcast as sent if it is our last
defer func() {
if t.BroadcastBatch.IsLast {
if t.BroadcastBatch.IsLast && t.BroadcastBatch.BroadcastID != models.NilBroadcastID {
err := models.MarkBroadcastSent(ctx, rt.DB, t.BroadcastBatch.BroadcastID)
if err != nil {
logrus.WithError(err).Error("error marking broadcast as sent")
Expand Down
43 changes: 17 additions & 26 deletions core/tasks/starts/start_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func (t *StartFlowTask) Timeout() time.Duration {
}

func (t *StartFlowTask) Perform(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID) error {
err := CreateFlowBatches(ctx, rt, t.FlowStart)
if err != nil {
if err := createFlowStartBatches(ctx, rt, t.FlowStart); err != nil {
models.MarkStartFailed(ctx, rt.DB, t.FlowStart.ID)

// if error is user created query error.. don't escalate error to sentry
Expand All @@ -55,8 +54,8 @@ func (t *StartFlowTask) Perform(ctx context.Context, rt *runtime.Runtime, orgID
return nil
}

// CreateFlowBatches takes our master flow start and creates batches of flow starts for all the unique contacts
func CreateFlowBatches(ctx context.Context, rt *runtime.Runtime, start *models.FlowStart) error {
// creates batches of flow starts for all the unique contacts
func createFlowStartBatches(ctx context.Context, rt *runtime.Runtime, start *models.FlowStart) error {
oa, err := models.GetOrgAssets(ctx, rt, start.OrgID)
if err != nil {
return errors.Wrap(err, "error loading org assets")
Expand Down Expand Up @@ -94,9 +93,6 @@ func CreateFlowBatches(ctx context.Context, rt *runtime.Runtime, start *models.F
}
}

rc := rt.RP.Get()
defer rc.Close()

// mark our start as starting, last task will mark as complete
err = models.MarkStartStarted(ctx, rt.DB, start.ID, len(contactIDs), createdContactIDs)
if err != nil {
Expand All @@ -118,9 +114,15 @@ func CreateFlowBatches(ctx context.Context, rt *runtime.Runtime, start *models.F
q = queue.HandlerQueue
}

contacts := make([]models.ContactID, 0, 100)
queueBatch := func(last bool) {
batch := start.CreateBatch(contacts, last, len(contactIDs))
rc := rt.RP.Get()
defer rc.Close()

// create tasks for batches of contacts
idBatches := models.ChunkSlice(contactIDs, startBatchSize)
for i, idBatch := range idBatches {
isLast := (i == len(idBatches)-1)

batch := start.CreateBatch(idBatch, isLast, len(contactIDs))

// task is different if we are an IVR flow
var batchTask tasks.Task
Expand All @@ -132,23 +134,12 @@ func CreateFlowBatches(ctx context.Context, rt *runtime.Runtime, start *models.F

err = tasks.Queue(rc, q, start.OrgID, batchTask, queue.DefaultPriority)
if err != nil {
// TODO: is continuing the right thing here? what do we do if redis is down? (panic!)
logrus.WithError(err).WithField("start_id", start.ID).Error("error while queuing start")
}
contacts = make([]models.ContactID, 0, 100)
}

// build up batches of contacts to start
for _, c := range contactIDs {
if len(contacts) == startBatchSize {
queueBatch(false)
if i == 0 {
return errors.Wrap(err, "error queuing flow start batch")
}
// if we've already queued other batches.. we don't want to error and have the task be retried
logrus.WithError(err).Error("error queuing flow start batch")
}
contacts = append(contacts, c)
}

// queue our last batch
if len(contacts) > 0 {
queueBatch(true)
}

return nil
Expand Down
Loading

0 comments on commit a1dfd25

Please sign in to comment.