Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Courier queuing for dummies #521

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 50 additions & 45 deletions core/msgio/courier.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package msgio

import (
"encoding/json"
"strconv"
"time"

"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/mailroom/core/models"

Expand All @@ -14,20 +15,61 @@ import (
)

const (
highPriority = 1
defaultPriority = 0
bulkPriority = 0
highPriority = 1
)

var queuePushScript = redis.NewScript(6, `
-- KEYS: [QueueType, QueueName, TPS, Priority, Items, EpochSecs]
local queueType, queueName, tps, priority, items, epochSecs = KEYS[1], KEYS[2], tonumber(KEYS[3]), KEYS[4], KEYS[5], KEYS[6]

-- first construct the base key for this queue from the type + name + tps, e.g. "msgs:0a77a158-1dcb-4c06-9aee-e15bdf64653e|10"
local queueKey = queueType .. ":" .. queueName .. "|" .. tps

-- each queue than has two sorted sets for bulk and high priority items, e.g. "msgs:0a77..653e|10/0" vs msgs:0a77..653e|10/1"
local priorityQueueKey = queueKey .. "/" .. priority

-- add the items to the sorted set using the full timestamp (e.g. 1636556789.123456) as the score
redis.call("ZADD", priorityQueueKey, epochSecs, items)

-- if we have a TPS limit, check the transaction counter for this epoch second to see if have already reached it
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. math.floor(epochSecs) -- e.g. "msgs:0a77..4653e|10:tps:1636556789"
curr = tonumber(redis.call("GET", tpsKey))
end

-- if we haven't hit the limit, add this queue to set of active queues
if not curr or curr < tps then
redis.call("ZINCRBY", queueType .. ":active", 0, queueKey)
return 1
else
return 0
end
`)

// PushCourierBatch pushes a batch of messages for a single contact and channel onto the appropriate courier queue
func PushCourierBatch(rc redis.Conn, ch *models.Channel, batch []*models.Msg, timestamp string) error {
priority := bulkPriority
if batch[0].HighPriority() {
priority = highPriority
}
batchJSON := jsonx.MustMarshal(batch)

_, err := queuePushScript.Do(rc, "msgs", ch.UUID(), ch.TPS(), priority, batchJSON, timestamp)
return err
}

// QueueCourierMessages queues messages for a single contact to Courier
func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*models.Msg) error {
if len(msgs) == 0 {
return nil
}

now := time.Now()
epochMS := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)

priority := defaultPriority
// get the time in seconds since the epoch as a floating point number
// e.g. 2021-11-10T15:10:49.123456+00:00 => "1636557205.123456"
now := dates.Now()
epochSeconds := strconv.FormatFloat(float64(now.UnixNano()/int64(time.Microsecond))/float64(1000000), 'f', 6, 64)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed for clarity.. it's the number of seconds


// we batch msgs by channel uuid
batch := make([]*models.Msg, 0, len(msgs))
Expand All @@ -36,17 +78,8 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod
// commits our batch to redis
commitBatch := func() error {
if len(batch) > 0 {
priority = defaultPriority
if batch[0].HighPriority() {
priority = highPriority
}

batchJSON, err := json.Marshal(batch)
if err != nil {
return err
}
start := time.Now()
_, err = queueMsg.Do(rc, epochMS, "msgs", currentChannel.UUID(), currentChannel.TPS(), priority, batchJSON)
err := PushCourierBatch(rc, currentChannel, batch, epochSeconds)
if err != nil {
return err
}
Expand Down Expand Up @@ -101,31 +134,3 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod
// any remaining in our batch, queue it up
return commitBatch()
}

var queueMsg = redis.NewScript(6, `
-- KEYS: [EpochMS, QueueType, QueueName, TPS, Priority, Value]

-- first push onto our specific queue
-- our queue name is built from the type, name and tps, usually something like: "msgs:uuid1-uuid2-uuid3-uuid4|tps"
local queueKey = KEYS[2] .. ":" .. KEYS[3] .. "|" .. KEYS[4]

-- our priority queue name also includes the priority of the message (we have one queue for default and one for bulk)
local priorityQueueKey = queueKey .. "/" .. KEYS[5]
redis.call("zadd", priorityQueueKey, KEYS[1], KEYS[6])
local tps = tonumber(KEYS[4])

-- if we have a TPS, check whether we are currently throttled
local curr = -1
if tps > 0 then
local tpsKey = queueKey .. ":tps:" .. math.floor(KEYS[1])
curr = tonumber(redis.call("get", tpsKey))
end

-- if we aren't then add to our active
if not curr or curr < tps then
redis.call("zincrby", KEYS[2] .. ":active", 0, queueKey)
return 1
else
return 0
end
`)
72 changes: 72 additions & 0 deletions core/msgio/courier_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package msgio_test

import (
"encoding/json"
"testing"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"
"github.com/nyaruka/mailroom/testsuite"
Expand Down Expand Up @@ -98,3 +101,72 @@ func TestQueueCourierMessages(t *testing.T) {
msgio.QueueCourierMessages(rc, testdata.Cathy.ID, []*models.Msg{ms.createMsg(t, rt, oa)})
})
}

func TestPushCourierBatch(t *testing.T) {
ctx, rt, _, rp := testsuite.Get()
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis)

oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshChannels)
require.NoError(t, err)

channel := oa.ChannelByID(testdata.TwilioChannel.ID)

msg1 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)
msg2 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)

err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg1, msg2}, "1636557205.123456")
require.NoError(t, err)

// check that channel has been added to active list
msgsActive, err := redis.Strings(rc.Do("ZRANGE", "msgs:active", 0, -1))
assert.NoError(t, err)
assert.Equal(t, []string{"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10"}, msgsActive)

// and that msgs were added as single batch to bulk priority (0) queue
queued, err := redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1))
assert.NoError(t, err)
assert.Equal(t, 1, len(queued))

unmarshaled, err := jsonx.DecodeGeneric(queued[0])
assert.NoError(t, err)
assert.Equal(t, 2, len(unmarshaled.([]interface{})))

item1ID, _ := unmarshaled.([]interface{})[0].(map[string]interface{})["id"].(json.Number).Int64()
item2ID, _ := unmarshaled.([]interface{})[1].(map[string]interface{})["id"].(json.Number).Int64()
assert.Equal(t, int64(msg1.ID()), item1ID)
assert.Equal(t, int64(msg2.ID()), item2ID)

// push another batch in the same epoch second with transaction counter still below limit
rc.Do("SET", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10:tps:1636557205", "5")

msg3 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)

err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg3}, "1636557205.234567")
require.NoError(t, err)

queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1))
assert.NoError(t, err)
assert.Equal(t, 2, len(queued))

// simulate channel having been throttled
rc.Do("ZREM", "msgs:active", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10")
rc.Do("SET", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10:tps:1636557205", "11")

msg4 := (&msgSpec{ChannelID: testdata.TwilioChannel.ID, ContactID: testdata.Cathy.ID, URNID: testdata.Cathy.URNID}).createMsg(t, rt, oa)

err = msgio.PushCourierBatch(rc, channel, []*models.Msg{msg4}, "1636557205.345678")
require.NoError(t, err)

// check that channel has *not* been added to active list
msgsActive, err = redis.Strings(rc.Do("ZRANGE", "msgs:active", 0, -1))
assert.NoError(t, err)
assert.Equal(t, []string{}, msgsActive)

// but msg was still added to queue
queued, err = redis.ByteSlices(rc.Do("ZRANGE", "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0", 0, -1))
assert.NoError(t, err)
assert.Equal(t, 3, len(queued))
}
32 changes: 27 additions & 5 deletions core/msgio/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ import (
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/null"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type msgSpec struct {
ChannelID models.ChannelID
ContactID models.ContactID
URNID models.URNID
Failed bool
ChannelID models.ChannelID
ContactID models.ContactID
URNID models.URNID
Failed bool
HighPriority bool
}

func (m *msgSpec) createMsg(t *testing.T, rt *runtime.Runtime, oa *models.OrgAssets) *models.Msg {
Expand All @@ -47,6 +49,10 @@ func (m *msgSpec) createMsg(t *testing.T, rt *runtime.Runtime, oa *models.OrgAss
msg, err := models.NewOutgoingMsg(rt.Config, oaOrg.Org(), channel, m.ContactID, flowMsg, time.Now())
require.NoError(t, err)

if m.HighPriority {
msg.SetResponseTo(models.NilMsgID, null.String("1234"))
}

models.InsertMessages(ctx, rt.DB, []*models.Msg{msg})
require.NoError(t, err)

Expand All @@ -58,6 +64,8 @@ func TestSendMessages(t *testing.T) {
rc := rp.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetData)

mockFCM := newMockFCMEndpoint("FCMID3")
defer mockFCM.Stop()

Expand All @@ -78,6 +86,13 @@ func TestSendMessages(t *testing.T) {
FCMTokensSynced []string
PendingMsgs int
}{
{
Description: "no messages",
Msgs: []msgSpec{},
QueueSizes: map[string][]int{},
FCMTokensSynced: []string{},
PendingMsgs: 0,
},
{
Description: "2 messages for Courier, and 1 Android",
Msgs: []msgSpec{
Expand All @@ -96,9 +111,16 @@ func TestSendMessages(t *testing.T) {
ContactID: testdata.Cathy.ID,
URNID: testdata.Cathy.URNID,
},
{
ChannelID: testdata.TwilioChannel.ID,
ContactID: testdata.Bob.ID,
URNID: testdata.Bob.URNID,
HighPriority: true,
},
},
QueueSizes: map[string][]int{
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2},
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // 2 default priority messages for Cathy
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/1": {1}, // 1 high priority message for Bob
},
FCMTokensSynced: []string{"FCMID1"},
PendingMsgs: 0,
Expand Down