diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a26585f0c..2b32d9edd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,12 +2,13 @@ name: CI on: [push, pull_request] env: go-version: '1.14.x' + postgis-version: '2.5' jobs: test: name: Test strategy: matrix: - pg-version: ['10', '11'] + pg-version: ['11', '12'] runs-on: ubuntu-latest steps: - name: Checkout code @@ -19,18 +20,22 @@ jobs: redis version: '5' - name: Install PostgreSQL - uses: nyaruka/postgis-action@master + uses: nyaruka/postgis-action@v2 with: postgresql version: ${{ matrix.pg-version }} + postgis version: ${{ env.postgis-version }} + postgresql password: temba - name: Install Linux packages run: sudo apt install -y --no-install-recommends postgresql-client - name: Initialize database + # we create our test database with a different user so that we can drop everything owned by this user between tests run: | - psql -h localhost -U postgres -c "CREATE USER mailroom_test PASSWORD 'temba';" - psql -h localhost -U postgres -c "ALTER ROLE mailroom_test WITH SUPERUSER;" - psql -h localhost -U postgres -c "CREATE DATABASE mailroom_test;" + export PGPASSWORD=temba + psql -h localhost -U postgres --no-password -c "CREATE USER mailroom_test PASSWORD 'temba';" + psql -h localhost -U postgres --no-password -c "ALTER ROLE mailroom_test WITH SUPERUSER;" + psql -h localhost -U postgres --no-password -c "CREATE DATABASE mailroom_test;" - name: Install Go uses: actions/setup-go@v1 @@ -38,7 +43,9 @@ jobs: go-version: ${{ env.go-version }} - name: Run tests - run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... + run: | + export PGPASSWORD=temba + go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... - name: Upload coverage if: success() @@ -59,7 +66,7 @@ jobs: # for backward compatibility, English docs are copied to root of docs directory run: | GOFLOW_VERSION=$(grep goflow go.mod | cut -d" " -f2 | cut -c2-) - curl https://codeload.github.com/nyaruka/goflow/tar.gz/v$GOFLOW_VERSION | tar --wildcards --strip=1 -zx "goflow-${GOFLOW_VERSION}/docs/*" + curl -L https://github.com/nyaruka/goflow/releases/download/v${GOFLOW_VERSION}/docs.tar.gz | tar zxv cp ./docs/en-us/*.* docs/ - name: Install Go diff --git a/CHANGELOG.md b/CHANGELOG.md index 004a10f9f..86731b744 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,62 @@ +v6.1.18 +---------- + * Update to latest goflow + * Rename tickets/internal package + +v6.1.17 +---------- + * Should match referral trigger with case insensitive + +v6.1.16 +---------- + * Update to latest goflow + * Add link local ips to default disallowed networks config + +v6.1.15 +---------- + * Update phonenumbers lib + * Decrease locations cache timeout to 1 minute + +v6.1.14 +---------- + * Support ElasticSearch 7.2 (backwards incompatible to Elastic 6.*) + +v6.1.13 +---------- + * Update to latest goflow + +v6.1.12 +---------- + * Update to latest goflow v0.110.0 + +v6.1.11 +---------- + * Update to latest goflow v0.109.4 + +v6.1.10 +---------- + * Simplify FCM client code + * Fix updating message status when id column is bigint + * Ensure courier messages are always queued for a single contact + * Fix not triggering FCM syncs for broadcasts and ticket reply messages + +v6.1.9 +---------- + * Update to goflow v0.109.0 + +v6.1.8 +---------- + * Update to latest goflow 0.108.0 + +v6.1.7 +---------- + * Use background instead of passive + +v6.1.6 +---------- + * Update to latest goflow v0.107.2 + * Add support for passive flows + v6.1.5 ---------- * Update to goflow v0.107.1 diff --git a/cmd/mailroom/main.go b/cmd/mailroom/main.go index 66a782d92..b1681f854 100644 --- a/cmd/mailroom/main.go +++ b/cmd/mailroom/main.go @@ -27,6 +27,7 @@ import ( _ "github.com/nyaruka/mailroom/core/tasks/starts" _ "github.com/nyaruka/mailroom/core/tasks/stats" _ "github.com/nyaruka/mailroom/core/tasks/timeouts" + _ "github.com/nyaruka/mailroom/services/tickets/intern" _ "github.com/nyaruka/mailroom/services/tickets/mailgun" _ "github.com/nyaruka/mailroom/services/tickets/rocketchat" _ "github.com/nyaruka/mailroom/services/tickets/zendesk" diff --git a/config/config.go b/config/config.go index b569999f7..47f7de809 100644 --- a/config/config.go +++ b/config/config.go @@ -84,7 +84,7 @@ func NewMailroomConfig() *Config { WebhooksInitialBackoff: 5000, WebhooksBackoffJitter: 0.5, SMTPServer: "", - DisallowedNetworks: `127.0.0.1,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16`, + DisallowedNetworks: `127.0.0.1,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,169.254.0.0/16,fe80::/10`, MaxStepsPerSprint: 100, MaxValueLength: 640, diff --git a/config/config_test.go b/config/config_test.go index e313830a5..4bb476ba2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -16,11 +16,14 @@ func TestParseDisallowedNetworks(t *testing.T) { privateNetwork2 := &net.IPNet{IP: net.IPv4(172, 16, 0, 0).To4(), Mask: net.CIDRMask(12, 32)} privateNetwork3 := &net.IPNet{IP: net.IPv4(192, 168, 0, 0).To4(), Mask: net.CIDRMask(16, 32)} + linkLocalIPv4 := &net.IPNet{IP: net.IPv4(169, 254, 0, 0).To4(), Mask: net.CIDRMask(16, 32)} + _, linkLocalIPv6, _ := net.ParseCIDR("fe80::/10") + // test with config defaults ips, ipNets, err := cfg.ParseDisallowedNetworks() assert.NoError(t, err) assert.Equal(t, []net.IP{net.IPv4(127, 0, 0, 1), net.ParseIP(`::1`)}, ips) - assert.Equal(t, []*net.IPNet{privateNetwork1, privateNetwork2, privateNetwork3}, ipNets) + assert.Equal(t, []*net.IPNet{privateNetwork1, privateNetwork2, privateNetwork3, linkLocalIPv4, linkLocalIPv6}, ipNets) // test with empty cfg.DisallowedNetworks = `` diff --git a/core/courier/courier_test.go b/core/courier/courier_test.go deleted file mode 100644 index ec1c19785..000000000 --- a/core/courier/courier_test.go +++ /dev/null @@ -1,168 +0,0 @@ -package courier_test - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/nyaruka/gocommon/urns" - "github.com/nyaruka/goflow/assets" - "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/mailroom/core/courier" - "github.com/nyaruka/mailroom/core/models" - "github.com/nyaruka/mailroom/testsuite" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type msgSpec struct { - ChannelUUID assets.ChannelUUID - Text string - ContactID models.ContactID - URN urns.URN - URNID models.URNID - Failed bool -} - -func createMsg(t *testing.T, m msgSpec) *models.Msg { - ctx := testsuite.CTX() - db := testsuite.DB() - db.MustExec(`UPDATE orgs_org SET is_suspended = $1 WHERE id = $2`, m.Failed, models.Org1) - - oa, err := models.GetOrgAssetsWithRefresh(ctx, db, models.Org1, models.RefreshOrg) - require.NoError(t, err) - - channel := oa.ChannelByUUID(m.ChannelUUID) - - flowMsg := flows.NewMsgOut(m.URN, channel.ChannelReference(), m.Text, nil, nil, nil, flows.NilMsgTopic) - msg, err := models.NewOutgoingMsg(oa.Org(), channel, m.ContactID, flowMsg, time.Now()) - require.NoError(t, err) - return msg -} - -func TestQueueMessages(t *testing.T) { - db := testsuite.DB() - rc := testsuite.RC() - testsuite.Reset() - models.FlushCache() - - // convert the Twitter channel to be an Android channel - db.MustExec(`UPDATE channels_channel SET name = 'Android', channel_type = 'A' WHERE id = $1`, models.TwitterChannelID) - androidChannelUUID := models.TwitterChannelUUID - - tests := []struct { - Description string - Msgs []msgSpec - QueueSizes map[string]int - }{ - { - Description: "2 queueable messages", - Msgs: []msgSpec{ - { - ChannelUUID: models.TwilioChannelUUID, - Text: "normal msg 1", - ContactID: models.CathyID, - URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)), - URNID: models.CathyURNID, - }, - { - ChannelUUID: models.TwilioChannelUUID, - Text: "normal msg 2", - ContactID: models.CathyID, - URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)), - URNID: models.CathyURNID, - }, - }, - QueueSizes: map[string]int{ - "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 2, - }, - }, - { - Description: "1 queueable message and 1 failed", - Msgs: []msgSpec{ - { - ChannelUUID: models.TwilioChannelUUID, - Text: "normal msg 1", - ContactID: models.CathyID, - URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)), - URNID: models.CathyURNID, - Failed: true, - }, - { - ChannelUUID: models.TwilioChannelUUID, - Text: "normal msg 1", - ContactID: models.CathyID, - URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)), - URNID: models.CathyURNID, - }, - }, - QueueSizes: map[string]int{ - "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 1, - }, - }, - { - Description: "1 queueable message and 1 Android", - Msgs: []msgSpec{ - { - ChannelUUID: androidChannelUUID, - Text: "normal msg 1", - ContactID: models.CathyID, - URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)), - URNID: models.CathyURNID, - }, - { - ChannelUUID: models.TwilioChannelUUID, - Text: "normal msg 1", - ContactID: models.CathyID, - URN: urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)), - URNID: models.CathyURNID, - }, - }, - QueueSizes: map[string]int{ - "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 1, - }, - }, - { - Description: "0 messages", - Msgs: []msgSpec{}, - QueueSizes: map[string]int{ - "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 0, - }, - }, - } - - for _, tc := range tests { - msgs := make([]*models.Msg, len(tc.Msgs)) - for i := range tc.Msgs { - msgs[i] = createMsg(t, tc.Msgs[i]) - } - - rc.Do("FLUSHDB") - courier.QueueMessages(rc, msgs) - - for queueKey, size := range tc.QueueSizes { - if size == 0 { - result, err := rc.Do("ZCARD", queueKey) - require.NoError(t, err) - assert.Equal(t, size, int(result.(int64))) - } else { - result, err := rc.Do("ZPOPMAX", queueKey) - require.NoError(t, err) - - results := result.([]interface{}) - assert.Equal(t, 2, len(results)) // result is (item, score) - - batchJSON := results[0].([]byte) - var batch []map[string]interface{} - err = json.Unmarshal(batchJSON, &batch) - require.NoError(t, err) - - assert.Equal(t, size, len(batch)) - } - } - } - - testsuite.Reset() -} diff --git a/core/handlers/msg_created.go b/core/handlers/msg_created.go index f42092dd6..4a89874ef 100644 --- a/core/handlers/msg_created.go +++ b/core/handlers/msg_created.go @@ -25,7 +25,7 @@ func handlePreMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *m event := e.(*events.MsgCreatedEvent) // we only clear timeouts on messaging flows - if scene.Session().SessionType() != models.MessagingFlow { + if scene.Session().SessionType() != models.FlowTypeMessaging { return nil } @@ -73,12 +73,12 @@ func handleMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *mode // ignore events that don't have a channel or URN set // TODO: maybe we should create these messages in a failed state? - if scene.Session().SessionType() == models.MessagingFlow && (event.Msg.URN() == urns.NilURN || event.Msg.Channel() == nil) { + if scene.Session().SessionType() == models.FlowTypeMessaging && (event.Msg.URN() == urns.NilURN || event.Msg.Channel() == nil) { return nil } // messages in messaging flows must have urn id set on them, if not, go look it up - if scene.Session().SessionType() == models.MessagingFlow { + if scene.Session().SessionType() == models.FlowTypeMessaging { urn := event.Msg.URN() if models.GetURNInt(urn, "id") == 0 { urn, err := models.GetOrCreateURN(ctx, tx, oa, scene.ContactID(), event.Msg.URN()) @@ -114,7 +114,7 @@ func handleMsgCreated(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *mode scene.AppendToEventPreCommitHook(hooks.CommitMessagesHook, msg) // don't send messages for surveyor flows - if scene.Session().SessionType() != models.SurveyorFlow { + if scene.Session().SessionType() != models.FlowTypeSurveyor { scene.AppendToEventPostCommitHook(hooks.SendMessagesHook, msg) } diff --git a/core/handlers/msg_received.go b/core/handlers/msg_received.go index 076824894..cf3e4cc65 100644 --- a/core/handlers/msg_received.go +++ b/core/handlers/msg_received.go @@ -22,7 +22,7 @@ func handleMsgReceived(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *mod event := e.(*events.MsgReceivedEvent) // for surveyor sessions we need to actually create the message - if scene.Session() != nil && scene.Session().SessionType() == models.SurveyorFlow { + if scene.Session() != nil && scene.Session().SessionType() == models.FlowTypeSurveyor { logrus.WithFields(logrus.Fields{ "contact_uuid": scene.ContactUUID(), "session_id": scene.SessionID(), diff --git a/core/hooks/send_messages.go b/core/hooks/send_messages.go index 136125c6c..387c8b14d 100644 --- a/core/hooks/send_messages.go +++ b/core/hooks/send_messages.go @@ -2,17 +2,12 @@ package hooks import ( "context" - "time" - "github.com/nyaruka/mailroom/config" - "github.com/nyaruka/mailroom/core/courier" "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" - "github.com/apex/log" - "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" - "github.com/sirupsen/logrus" ) // SendMessagesHook is our hook for sending scene messages @@ -22,106 +17,24 @@ type sendMessagesHook struct{} // Apply sends all non-android messages to courier func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { - rc := rp.Get() - defer rc.Close() - - // messages that need to be marked as pending - pending := make([]*models.Msg, 0, 1) - - // android channels that need to be notified to sync - androidChannels := make(map[*models.Channel]bool) + msgs := make([]*models.Msg, 0, 1) // for each scene gather all our messages for s, args := range scenes { - // walk through our messages, separate by whether they're android or not - courierMsgs := make([]*models.Msg, 0, len(args)) + sceneMsgs := make([]*models.Msg, 0, 1) for _, m := range args { - msg := m.(*models.Msg) - channel := msg.Channel() - if channel != nil { - if channel.Type() == models.ChannelTypeAndroid { - androidChannels[channel] = true - } else { - courierMsgs = append(courierMsgs, msg) - } - } else { - pending = append(pending, msg) - } - } - - // if there are courier messages to send, do so - if len(courierMsgs) > 0 { - // if our scene has a timeout, set it on our last message - if s.Session().Timeout() != nil && s.Session().WaitStartedOn() != nil { - courierMsgs[len(courierMsgs)-1].SetTimeout(*s.Session().WaitStartedOn(), *s.Session().Timeout()) - } - - log := log.WithField("messages", courierMsgs).WithField("scene", s.SessionID) - - err := courier.QueueMessages(rc, courierMsgs) - - // not being able to queue a message isn't the end of the world, log but don't return an error - if err != nil { - log.WithError(err).Error("error queuing message") - - // in the case of errors we do want to change the messages back to pending however so they - // get queued later. (for the common case messages are only inserted and queued, without a status update) - for _, msg := range courierMsgs { - pending = append(pending, msg) - } - } - } - } - - // if we have any android messages, trigger syncs for the unique channels - for channel := range androidChannels { - // no FCM key for this rapidpro install? break out but log - if config.Mailroom.FCMKey == "" { - logrus.Error("cannot trigger sync for android channel, FCM Key unset") - break - } - - // no fcm id for this channel, noop, we can't trigger a sync - fcmID := channel.ConfigValue(models.ChannelConfigFCMID, "") - if fcmID == "" { - continue - } - - client, err := fcm.NewClient(config.Mailroom.FCMKey) - if err != nil { - logrus.WithError(err).Error("error initializing fcm client") - continue + sceneMsgs = append(sceneMsgs, m.(*models.Msg)) } - sync := &fcm.Message{ - Token: fcmID, - Priority: "high", - CollapseKey: "sync", - Data: map[string]interface{}{ - "msg": "sync", - }, + // if our scene has a timeout, set it on our last message + if len(sceneMsgs) > 0 && s.Session().Timeout() != nil && s.Session().WaitStartedOn() != nil { + sceneMsgs[len(sceneMsgs)-1].SetTimeout(*s.Session().WaitStartedOn(), *s.Session().Timeout()) } - start := time.Now() - _, err = client.Send(sync) - - if err != nil { - // log failures but continue, relayer will sync on its own - logrus.WithError(err).WithField("channel_uuid", channel.UUID()).Error("error syncing channel") - } else { - logrus.WithField("elapsed", time.Since(start)).WithField("channel_uuid", channel.UUID()).Debug("android sync complete") - } - } - - // any messages that didn't get sent should be moved back to pending (they are queued at creation to save an - // update in the common case) - if len(pending) > 0 { - err := models.MarkMessagesPending(ctx, tx, pending) - if err != nil { - log.WithError(err).Error("error marking message as pending") - } + msgs = append(msgs, sceneMsgs...) } + msgio.SendMessages(ctx, tx, rp, nil, msgs) return nil } diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index b26e1cf87..ca4468f75 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -400,7 +400,7 @@ func ResumeIVRFlow( return errors.Wrapf(err, "error creating flow contact") } - session, err := models.ActiveSessionForContact(ctx, db, oa, models.IVRFlow, contact) + session, err := models.ActiveSessionForContact(ctx, db, oa, models.FlowTypeVoice, contact) if err != nil { return errors.Wrapf(err, "error loading session for contact") } diff --git a/core/models/assets.go b/core/models/assets.go index 01122e213..cfbba1176 100644 --- a/core/models/assets.go +++ b/core/models/assets.go @@ -74,7 +74,7 @@ var ErrNotFound = errors.New("not found") var orgCache = cache.New(time.Minute*15, time.Minute*15) const cacheTimeout = time.Second * 5 -const locationCacheTimeout = time.Hour +const locationCacheTimeout = time.Second * 60 // FlushCache clears our entire org cache func FlushCache() { diff --git a/core/models/channel_connection.go b/core/models/channel_connection.go index 12c0d7682..e2ec1f31e 100644 --- a/core/models/channel_connection.go +++ b/core/models/channel_connection.go @@ -251,6 +251,9 @@ WHERE cc.channel_id = $1 AND cc.connection_type = $2 AND cc.external_id = $3 +ORDER BY + cc.id DESC +LIMIT 1 ` // SelectChannelConnectionByExternalID loads a channel connection by id diff --git a/core/models/flows.go b/core/models/flows.go index 0411a3cb5..b3bc3dfca 100644 --- a/core/models/flows.go +++ b/core/models/flows.go @@ -27,19 +27,26 @@ type FlowType string // flow type constants const ( - IVRFlow = FlowType("V") - MessagingFlow = FlowType("M") - SurveyorFlow = FlowType("S") + FlowTypeMessaging = FlowType("M") + FlowTypeBackground = FlowType("B") + FlowTypeSurveyor = FlowType("S") + FlowTypeVoice = FlowType("V") ) +// Interrupts returns whether this flow type interrupts existing sessions +func (t FlowType) Interrupts() bool { + return t != FlowTypeBackground +} + const ( flowConfigIVRRetryMinutes = "ivr_retry" ) var flowTypeMapping = map[flows.FlowType]FlowType{ - flows.FlowTypeMessaging: MessagingFlow, - flows.FlowTypeVoice: IVRFlow, - flows.FlowTypeMessagingOffline: SurveyorFlow, + flows.FlowTypeMessaging: FlowTypeMessaging, + flows.FlowTypeMessagingBackground: FlowTypeBackground, + flows.FlowTypeMessagingOffline: FlowTypeSurveyor, + flows.FlowTypeVoice: FlowTypeVoice, } // Flow is the mailroom type for a flow diff --git a/core/models/groups.go b/core/models/groups.go index 0190d3366..5a391c60d 100644 --- a/core/models/groups.go +++ b/core/models/groups.go @@ -10,7 +10,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/lib/pq" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) diff --git a/core/models/groups_test.go b/core/models/groups_test.go index 7ae7f7cb5..1a214b2e6 100644 --- a/core/models/groups_test.go +++ b/core/models/groups_test.go @@ -11,7 +11,7 @@ import ( "github.com/nyaruka/mailroom/testsuite" "github.com/lib/pq" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) diff --git a/core/models/msgs.go b/core/models/msgs.go index 7b9d1b86e..d5eea2cd5 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -23,7 +23,6 @@ import ( "github.com/nyaruka/null" "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/lib/pq/hstore" "github.com/pkg/errors" @@ -460,17 +459,11 @@ func UpdateMessage(ctx context.Context, tx Queryer, msgID flows.MsgID, status Ms } // MarkMessagesPending marks the passed in messages as pending -func MarkMessagesPending(ctx context.Context, tx *sqlx.Tx, msgs []*Msg) error { - return updateMessageStatus(ctx, tx, msgs, MsgStatusPending) +func MarkMessagesPending(ctx context.Context, db Queryer, msgs []*Msg) error { + return updateMessageStatus(ctx, db, msgs, MsgStatusPending) } -// MarkMessagesQueued marks the passed in messages as queued -func MarkMessagesQueued(ctx context.Context, tx *sqlx.Tx, msgs []*Msg) error { - return updateMessageStatus(ctx, tx, msgs, MsgStatusQueued) -} - -// MarkMessagesQueued marks the passed in messages as queued -func updateMessageStatus(ctx context.Context, tx *sqlx.Tx, msgs []*Msg, status MsgStatus) error { +func updateMessageStatus(ctx context.Context, db Queryer, msgs []*Msg, status MsgStatus) error { is := make([]interface{}, len(msgs)) for i, msg := range msgs { m := &msg.m @@ -478,7 +471,7 @@ func updateMessageStatus(ctx context.Context, tx *sqlx.Tx, msgs []*Msg, status M is[i] = m } - return BulkQuery(ctx, "updating message status", tx, updateMsgStatusSQL, is) + return BulkQuery(ctx, "updating message status", db, updateMsgStatusSQL, is) } const updateMsgStatusSQL = ` @@ -491,7 +484,7 @@ FROM ( ) AS m(id, status) WHERE - msgs_msg.id = m.id::int + msgs_msg.id = m.id::bigint ` // GetMessageIDFromUUID gets the ID of a message from its UUID diff --git a/core/models/msgs_test.go b/core/models/msgs_test.go index 8a9b045a9..6f0478c51 100644 --- a/core/models/msgs_test.go +++ b/core/models/msgs_test.go @@ -166,3 +166,47 @@ func TestNormalizeAttachment(t *testing.T) { assert.Equal(t, tc.normalized, string(models.NormalizeAttachment(utils.Attachment(tc.raw)))) } } + +func TestMarkMessages(t *testing.T) { + ctx, db, _ := testsuite.Reset() + defer testsuite.Reset() + + oa, err := models.GetOrgAssetsWithRefresh(ctx, db, models.Org1, models.RefreshOrg) + require.NoError(t, err) + + channel := oa.ChannelByUUID(models.TwilioChannelUUID) + + insertMsg := func(text string) *models.Msg { + urn := urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", models.CathyURNID)) + flowMsg := flows.NewMsgOut(urn, channel.ChannelReference(), text, nil, nil, nil, flows.NilMsgTopic) + msg, err := models.NewOutgoingMsg(oa.Org(), channel, models.CathyID, flowMsg, time.Now()) + require.NoError(t, err) + + err = models.InsertMessages(ctx, db, []*models.Msg{msg}) + require.NoError(t, err) + + return msg + } + + msg1 := insertMsg("Hello") + msg2 := insertMsg("Hola") + insertMsg("Howdy") + + models.MarkMessagesPending(ctx, db, []*models.Msg{msg1, msg2}) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 2) + + // try running on database with BIGINT message ids + db.MustExec(`ALTER TABLE "msgs_msg" ALTER COLUMN "id" TYPE bigint USING "id"::bigint;`) + db.MustExec(`ALTER SEQUENCE "msgs_msg_id_seq" AS bigint;`) + db.MustExec(`ALTER SEQUENCE "msgs_msg_id_seq" RESTART WITH 3000000000;`) + db = testsuite.DB() // need new connection after changes + + msg4 := insertMsg("Big messages!") + assert.Equal(t, flows.MsgID(3000000000), msg4.ID()) + + err = models.MarkMessagesPending(ctx, db, []*models.Msg{msg4}) + assert.NoError(t, err) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, 3) +} diff --git a/core/models/orgs.go b/core/models/orgs.go index 1381148a8..ef2138762 100644 --- a/core/models/orgs.go +++ b/core/models/orgs.go @@ -264,7 +264,7 @@ SELECT ROW_TO_JSON(o) FROM (SELECT (SELECT CASE is_anon WHEN TRUE THEN 'urns' WHEN FALSE THEN 'none' END) AS redaction_policy, $2::int AS max_value_length, (SELECT iso_code FROM orgs_language WHERE id = o.primary_language_id) AS default_language, - (SELECT ARRAY_AGG(iso_code) FROM orgs_language WHERE org_id = o.id) AS allowed_languages, + (SELECT ARRAY_AGG(iso_code ORDER BY iso_code ASC) FROM orgs_language WHERE org_id = o.id) AS allowed_languages, COALESCE( ( SELECT diff --git a/core/models/schedules_test.go b/core/models/schedules_test.go index fb156e8ae..52cce3eb1 100644 --- a/core/models/schedules_test.go +++ b/core/models/schedules_test.go @@ -90,7 +90,7 @@ func TestGetExpired(t *testing.T) { assert.Nil(t, schedules[1].Broadcast()) start := schedules[1].FlowStart() assert.NotNil(t, start) - assert.Equal(t, MessagingFlow, start.FlowType()) + assert.Equal(t, FlowTypeMessaging, start.FlowType()) assert.Equal(t, FavoritesFlowID, start.FlowID()) assert.Equal(t, Org1, start.OrgID()) assert.Equal(t, []ContactID{CathyID, GeorgeID}, start.ContactIDs()) diff --git a/core/models/search.go b/core/models/search.go index e7365211c..1b6c849b8 100644 --- a/core/models/search.go +++ b/core/models/search.go @@ -11,7 +11,7 @@ import ( "github.com/nyaruka/goflow/contactql" "github.com/nyaruka/goflow/contactql/es" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -77,7 +77,7 @@ func ContactIDsForQueryPage(ctx context.Context, client *elastic.Client, org *Or return nil, nil, 0, errors.Wrapf(err, "error parsing sort") } - s := client.Search("contacts").Routing(strconv.FormatInt(int64(org.OrgID()), 10)) + s := client.Search("contacts").TrackTotalHits(true).Routing(strconv.FormatInt(int64(org.OrgID()), 10)) s = s.Size(pageSize).From(offset).Query(eq).SortBy(fieldSort).FetchSource(false) results, err := s.Do(ctx) @@ -110,7 +110,7 @@ func ContactIDsForQueryPage(ctx context.Context, client *elastic.Client, org *Or "total_count": results.Hits.TotalHits, }).Debug("paged contact query complete") - return parsed, ids, results.Hits.TotalHits, nil + return parsed, ids, results.Hits.TotalHits.Value, nil } // ContactIDsForQuery returns the ids of all the contacts that match the passed in query diff --git a/core/models/search_test.go b/core/models/search_test.go index 97ae645ad..e6b0a9777 100644 --- a/core/models/search_test.go +++ b/core/models/search_test.go @@ -9,7 +9,7 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/testsuite" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -84,7 +84,8 @@ func TestContactIDsForQueryPage(t *testing.T) { "order": "desc" } } - ] + ], + "track_total_hits": true }`, MockedESResponse: fmt.Sprintf(`{ "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", @@ -195,7 +196,8 @@ func TestContactIDsForQueryPage(t *testing.T) { "order": "desc" } } - ] + ], + "track_total_hits": true }`, MockedESResponse: fmt.Sprintf(`{ "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", diff --git a/core/models/starts_test.go b/core/models/starts_test.go index 8db62b016..802ffb414 100644 --- a/core/models/starts_test.go +++ b/core/models/starts_test.go @@ -45,7 +45,7 @@ func TestStarts(t *testing.T) { assert.Equal(t, startID, start.ID()) assert.Equal(t, models.Org1, start.OrgID()) assert.Equal(t, models.SingleMessageFlowID, start.FlowID()) - assert.Equal(t, models.MessagingFlow, start.FlowType()) + assert.Equal(t, models.FlowTypeMessaging, start.FlowType()) assert.Equal(t, "", start.Query()) assert.Equal(t, models.DoRestartParticipants, start.RestartParticipants()) assert.Equal(t, models.DoIncludeActive, start.IncludeActive()) diff --git a/core/models/test_constants.go b/core/models/test_constants.go index 19ccfebaf..2a0275f7f 100644 --- a/core/models/test_constants.go +++ b/core/models/test_constants.go @@ -128,6 +128,9 @@ var ZendeskUUID = assets.TicketerUUID("4ee6d4f3-f92b-439b-9718-8da90c05490b") var RocketChatID = TicketerID(3) var RocketChatUUID = assets.TicketerUUID("6c50665f-b4ff-4e37-9625-bc464fe6a999") +var InternalID = TicketerID(4) +var InternalUUID = assets.TicketerUUID("8bd48029-6ca1-46a8-aa14-68f7213b82b3") + // constants for org 2, just a few here var Org2 = OrgID(2) diff --git a/core/models/triggers.go b/core/models/triggers.go index 2bdf17716..15d3b8509 100644 --- a/core/models/triggers.go +++ b/core/models/triggers.go @@ -184,7 +184,7 @@ func FindMatchingReferralTrigger(org *OrgAssets, channel *Channel, referrerID st for _, t := range org.Triggers() { if t.TriggerType() == ReferralTriggerType { // matches referrer id? that takes top precedence, return right away - if referrerID != "" && referrerID == t.ReferrerID() && (t.ChannelID() == NilChannelID || t.ChannelID() == channel.ID()) { + if strings.ToLower(referrerID) != "" && strings.ToLower(referrerID) == strings.ToLower(t.ReferrerID()) && (t.ChannelID() == NilChannelID || t.ChannelID() == channel.ID()) { return t } diff --git a/core/models/triggers_test.go b/core/models/triggers_test.go index 55f2aa9c7..0128db280 100644 --- a/core/models/triggers_test.go +++ b/core/models/triggers_test.go @@ -56,6 +56,7 @@ func TestChannelTriggers(t *testing.T) { {"", TwilioChannelID, NilTriggerID}, {"foo", TwilioChannelID, NilTriggerID}, {"foo", TwitterChannelID, fooID}, + {"FOO", TwitterChannelID, fooID}, {"bar", TwilioChannelID, barID}, {"bar", TwitterChannelID, barID}, {"zap", TwilioChannelID, NilTriggerID}, @@ -106,6 +107,7 @@ func TestTriggers(t *testing.T) { TriggerID TriggerID }{ {"join", cathy, joinID}, + {"JOIN", cathy, joinID}, {"join this", cathy, joinID}, {"resist", george, resistID}, {"resist", cathy, farmersID}, diff --git a/core/msgio/android.go b/core/msgio/android.go new file mode 100644 index 000000000..67c80db81 --- /dev/null +++ b/core/msgio/android.go @@ -0,0 +1,57 @@ +package msgio + +import ( + "time" + + "github.com/nyaruka/mailroom/config" + "github.com/nyaruka/mailroom/core/models" + "github.com/pkg/errors" + + "github.com/edganiukov/fcm" + "github.com/sirupsen/logrus" +) + +// SyncAndroidChannels tries to trigger syncs of the given Android channels via FCM +func SyncAndroidChannels(fc *fcm.Client, channels []*models.Channel) { + if fc == nil { + logrus.Warn("skipping Android sync as instance has not configured FCM") + return + } + + for _, channel := range channels { + // no FCM ID for this channel, noop, we can't trigger a sync + fcmID := channel.ConfigValue(models.ChannelConfigFCMID, "") + if fcmID == "" { + continue + } + + sync := &fcm.Message{ + Token: fcmID, + Priority: "high", + CollapseKey: "sync", + Data: map[string]interface{}{"msg": "sync"}, + } + + start := time.Now() + _, err := fc.Send(sync) + + if err != nil { + // log failures but continue, relayer will sync on its own + logrus.WithError(err).WithField("channel_uuid", channel.UUID()).Error("error syncing channel") + } else { + logrus.WithField("elapsed", time.Since(start)).WithField("channel_uuid", channel.UUID()).Debug("android sync complete") + } + } +} + +// CreateFCMClient creates an FCM client based on the configured FCM API key +func CreateFCMClient() *fcm.Client { + if config.Mailroom.FCMKey == "" { + return nil + } + client, err := fcm.NewClient(config.Mailroom.FCMKey) + if err != nil { + panic(errors.Wrap(err, "unable to create FCM client")) + } + return client +} diff --git a/core/msgio/android_test.go b/core/msgio/android_test.go new file mode 100644 index 000000000..a442763b9 --- /dev/null +++ b/core/msgio/android_test.go @@ -0,0 +1,107 @@ +package msgio_test + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/goflow/utils" + "github.com/nyaruka/mailroom/config" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/edganiukov/fcm" +) + +type MockFCMEndpoint struct { + server *httptest.Server + tokens []string + + // log of messages sent to this endpoint + Messages []*fcm.Message +} + +func (m *MockFCMEndpoint) Handle(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + requestBody, _ := ioutil.ReadAll(r.Body) + + message := &fcm.Message{} + jsonx.Unmarshal(requestBody, message) + + m.Messages = append(m.Messages, message) + + w.Header().Set("Content-Type", "application/json") + + if utils.StringSliceContains(m.tokens, message.Token, false) { + w.WriteHeader(200) + w.Write([]byte(`{}`)) + } else { + w.WriteHeader(200) + w.Write([]byte(`{"error": "bad_token"}`)) + } +} + +func (m *MockFCMEndpoint) Stop() { + m.server.Close() +} + +func (m *MockFCMEndpoint) Client(apiKey string) *fcm.Client { + client, _ := fcm.NewClient("FCMKEY123", fcm.WithEndpoint(m.server.URL)) + return client +} + +func newMockFCMEndpoint(tokens ...string) *MockFCMEndpoint { + mock := &MockFCMEndpoint{tokens: tokens} + mock.server = httptest.NewServer(http.HandlerFunc(mock.Handle)) + return mock +} + +func TestSyncAndroidChannels(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + + mockFCM := newMockFCMEndpoint("FCMID3") + defer mockFCM.Stop() + + fc := mockFCM.Client("FCMKEY123") + + // create some Android channels + channel1ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 1", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": ""}) // no FCM ID + channel2ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 2", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID2"}) // invalid FCM ID + channel3ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 3", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID3"}) // valid FCM ID + + oa, err := models.GetOrgAssetsWithRefresh(ctx, db, models.Org1, models.RefreshChannels) + require.NoError(t, err) + + channel1 := oa.ChannelByID(channel1ID) + channel2 := oa.ChannelByID(channel2ID) + channel3 := oa.ChannelByID(channel3ID) + + msgio.SyncAndroidChannels(fc, []*models.Channel{channel1, channel2, channel3}) + + // check that we try to sync the 2 channels with FCM IDs, even tho one fails + assert.Equal(t, 2, len(mockFCM.Messages)) + assert.Equal(t, "FCMID2", mockFCM.Messages[0].Token) + assert.Equal(t, "FCMID3", mockFCM.Messages[1].Token) + + assert.Equal(t, "high", mockFCM.Messages[0].Priority) + assert.Equal(t, "sync", mockFCM.Messages[0].CollapseKey) + assert.Equal(t, map[string]interface{}{"msg": "sync"}, mockFCM.Messages[0].Data) +} + +func TestCreateFCMClient(t *testing.T) { + config.Mailroom.FCMKey = "1234" + + assert.NotNil(t, msgio.CreateFCMClient()) + + config.Mailroom.FCMKey = "" + + assert.Nil(t, msgio.CreateFCMClient()) +} diff --git a/core/courier/courier.go b/core/msgio/courier.go similarity index 88% rename from core/courier/courier.go rename to core/msgio/courier.go index 9536af5f6..f4d603d0c 100644 --- a/core/courier/courier.go +++ b/core/msgio/courier.go @@ -1,13 +1,14 @@ -package courier +package msgio import ( "encoding/json" "strconv" "time" - "github.com/gomodule/redigo/redis" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/mailroom/core/models" + + "github.com/gomodule/redigo/redis" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -17,8 +18,8 @@ const ( defaultPriority = 0 ) -// QueueMessages queues messages to courier, these should all be for the same contact -func QueueMessages(rc redis.Conn, msgs []*models.Msg) error { +// QueueCourierMessages queues messages for a single contact to Courier +func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*models.Msg) error { if len(msgs) == 0 { return nil } @@ -51,6 +52,7 @@ func QueueMessages(rc redis.Conn, msgs []*models.Msg) error { } logrus.WithFields(logrus.Fields{ "msgs": len(batch), + "contact_id": contactID, "channel_uuid": currentChannel.UUID(), "elapsed": time.Since(start), }).Info("msgs queued to courier") @@ -59,6 +61,11 @@ func QueueMessages(rc redis.Conn, msgs []*models.Msg) error { } for _, msg := range msgs { + // android messages should never get in here + if msg.Channel() != nil && msg.Channel().Type() == models.ChannelTypeAndroid { + panic("trying to queue android messages to courier") + } + // ignore any message without a channel or already marked as failed (maybe org is suspended) if msg.ChannelUUID() == "" || msg.Status() == models.MsgStatusFailed { continue @@ -74,11 +81,6 @@ func QueueMessages(rc redis.Conn, msgs []*models.Msg) error { return errors.Errorf("msg passed with nil urn: %s", msg.URN()) } - // android channel? ignore - if msg.Channel().Type() == models.ChannelTypeAndroid { - continue - } - // same channel? add to batch if msg.Channel() == currentChannel { batch = append(batch, msg) diff --git a/core/msgio/courier_test.go b/core/msgio/courier_test.go new file mode 100644 index 000000000..65b236976 --- /dev/null +++ b/core/msgio/courier_test.go @@ -0,0 +1,131 @@ +package msgio_test + +import ( + "encoding/json" + "testing" + + "github.com/gomodule/redigo/redis" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQueueCourierMessages(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + rc := testsuite.RC() + testsuite.Reset() + models.FlushCache() + + defer rc.Close() + + // create an Andoid channel + androidChannelID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 1", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID"}) + + oa, err := models.GetOrgAssetsWithRefresh(ctx, db, models.Org1, models.RefreshOrg|models.RefreshChannels) + require.NoError(t, err) + + tests := []struct { + Description string + Msgs []msgSpec + QueueSizes map[string]int + }{ + { + Description: "2 queueable messages", + Msgs: []msgSpec{ + { + ChannelID: models.TwilioChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + { + ChannelID: models.TwilioChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + }, + QueueSizes: map[string]int{ + "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 2, + }, + }, + { + Description: "1 queueable message and 1 failed", + Msgs: []msgSpec{ + { + ChannelID: models.TwilioChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + Failed: true, + }, + { + ChannelID: models.TwilioChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + }, + QueueSizes: map[string]int{ + "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 1, + }, + }, + { + Description: "0 messages", + Msgs: []msgSpec{}, + QueueSizes: map[string]int{ + "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 0, + }, + }, + } + + for _, tc := range tests { + var contactID models.ContactID + msgs := make([]*models.Msg, len(tc.Msgs)) + for i, ms := range tc.Msgs { + msgs[i] = ms.createMsg(t, db, oa) + contactID = ms.ContactID + } + + rc.Do("FLUSHDB") + msgio.QueueCourierMessages(rc, contactID, msgs) + + assertCourierQueueSizes(t, rc, tc.QueueSizes, "courier queue sizes mismatch in '%s'", tc.Description) + } + + // check that trying to queue a courier message will panic + assert.Panics(t, func() { + ms := msgSpec{ + ChannelID: androidChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + } + msgio.QueueCourierMessages(rc, models.CathyID, []*models.Msg{ms.createMsg(t, db, oa)}) + }) + + testsuite.Reset() +} + +func assertCourierQueueSizes(t *testing.T, rc redis.Conn, sizes map[string]int, msgAndArgs ...interface{}) { + for queueKey, size := range sizes { + if size == 0 { + result, err := rc.Do("ZCARD", queueKey) + require.NoError(t, err) + assert.Equal(t, size, int(result.(int64))) + } else { + result, err := rc.Do("ZPOPMAX", queueKey) + require.NoError(t, err) + + results := result.([]interface{}) + assert.Equal(t, 2, len(results)) // result is (item, score) + + batchJSON := results[0].([]byte) + var batch []map[string]interface{} + err = json.Unmarshal(batchJSON, &batch) + require.NoError(t, err) + + assert.Equal(t, size, len(batch), msgAndArgs...) + } + } +} diff --git a/core/msgio/send.go b/core/msgio/send.go new file mode 100644 index 000000000..850886890 --- /dev/null +++ b/core/msgio/send.go @@ -0,0 +1,79 @@ +package msgio + +import ( + "context" + + "github.com/edganiukov/fcm" + "github.com/nyaruka/mailroom/core/models" + + "github.com/apex/log" + "github.com/gomodule/redigo/redis" +) + +// SendMessages tries to send the given messages via Courier or Android syncing +func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, fc *fcm.Client, msgs []*models.Msg) { + // messages to be sent by courier, organized by contact + courierMsgs := make(map[models.ContactID][]*models.Msg, 100) + + // android channels that need to be notified to sync + androidChannels := make([]*models.Channel, 0, 5) + androidChannelsSeen := make(map[*models.Channel]bool) + + // messages that need to be marked as pending + pending := make([]*models.Msg, 0, 1) + + // walk through our messages, separate by whether they have a channel and if it's Android + for _, msg := range msgs { + channel := msg.Channel() + if channel != nil { + if channel.Type() == models.ChannelTypeAndroid { + if !androidChannelsSeen[channel] { + androidChannels = append(androidChannels, channel) + } + androidChannelsSeen[channel] = true + } else { + courierMsgs[msg.ContactID()] = append(courierMsgs[msg.ContactID()], msg) + } + } else { + pending = append(pending, msg) + } + } + + // if there are courier messages to send, do so + if len(courierMsgs) > 0 { + rc := rp.Get() + defer rc.Close() + + for contactID, contactMsgs := range courierMsgs { + err := QueueCourierMessages(rc, contactID, contactMsgs) + + // not being able to queue a message isn't the end of the world, log but don't return an error + if err != nil { + log.WithField("messages", contactMsgs).WithField("contact", contactID).WithError(err).Error("error queuing messages") + + // in the case of errors we do want to change the messages back to pending however so they + // get queued later. (for the common case messages are only inserted and queued, without a status update) + for _, msg := range contactMsgs { + pending = append(pending, msg) + } + } + } + } + + // if we have any android messages, trigger syncs for the unique channels + if len(androidChannels) > 0 { + if fc == nil { + fc = CreateFCMClient() + } + SyncAndroidChannels(fc, androidChannels) + } + + // any messages that didn't get sent should be moved back to pending (they are queued at creation to save an + // update in the common case) + if len(pending) > 0 { + err := models.MarkMessagesPending(ctx, db, pending) + if err != nil { + log.WithError(err).Error("error marking message as pending") + } + } +} diff --git a/core/msgio/send_test.go b/core/msgio/send_test.go new file mode 100644 index 000000000..ec60a6d2c --- /dev/null +++ b/core/msgio/send_test.go @@ -0,0 +1,168 @@ +package msgio_test + +import ( + "fmt" + "testing" + "time" + + "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/goflow/assets" + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type msgSpec struct { + ChannelID models.ChannelID + ContactID models.ContactID + URNID models.URNID + Failed bool +} + +func (m *msgSpec) createMsg(t *testing.T, db *sqlx.DB, oa *models.OrgAssets) *models.Msg { + // Only way to create a failed outgoing message is to suspend the org and reload the org. + // However the channels have to be fetched from the same org assets thus why this uses its + // own org assets instance. + ctx := testsuite.CTX() + db.MustExec(`UPDATE orgs_org SET is_suspended = $1 WHERE id = $2`, m.Failed, models.Org1) + oaOrg, _ := models.GetOrgAssetsWithRefresh(ctx, db, models.Org1, models.RefreshOrg) + + var channel *models.Channel + var channelRef *assets.ChannelReference + + if m.ChannelID != models.NilChannelID { + channel = oa.ChannelByID(m.ChannelID) + channelRef = channel.ChannelReference() + } + urn := urns.URN(fmt.Sprintf("tel:+250700000001?id=%d", m.URNID)) + + flowMsg := flows.NewMsgOut(urn, channelRef, "Hello", nil, nil, nil, flows.NilMsgTopic) + msg, err := models.NewOutgoingMsg(oaOrg.Org(), channel, m.ContactID, flowMsg, time.Now()) + require.NoError(t, err) + + models.InsertMessages(ctx, db, []*models.Msg{msg}) + require.NoError(t, err) + + return msg +} + +func TestSendMessages(t *testing.T) { + ctx := testsuite.CTX() + db := testsuite.DB() + rp := testsuite.RP() + rc := rp.Get() + defer rc.Close() + + mockFCM := newMockFCMEndpoint("FCMID3") + defer mockFCM.Stop() + + fc := mockFCM.Client("FCMKEY123") + + // create some Andoid channels + androidChannel1ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 1", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID1"}) + androidChannel2ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 2", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID2"}) + testdata.InsertChannel(t, db, models.Org1, "A", "Android 3", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID3"}) + + oa, err := models.GetOrgAssetsWithRefresh(ctx, db, models.Org1, models.RefreshChannels) + require.NoError(t, err) + + tests := []struct { + Description string + Msgs []msgSpec + QueueSizes map[string]int + FCMTokensSynced []string + PendingMsgs int + }{ + { + Description: "2 messages for Courier, and 1 Android", + Msgs: []msgSpec{ + { + ChannelID: models.TwilioChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + { + ChannelID: androidChannel1ID, + ContactID: models.BobID, + URNID: models.BobURNID, + }, + { + ChannelID: models.TwilioChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + }, + QueueSizes: map[string]int{ + "msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": 2, + }, + FCMTokensSynced: []string{"FCMID1"}, + PendingMsgs: 0, + }, + { + Description: "each Android channel synced once", + Msgs: []msgSpec{ + { + ChannelID: androidChannel1ID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + { + ChannelID: androidChannel2ID, + ContactID: models.BobID, + URNID: models.BobURNID, + }, + { + ChannelID: androidChannel1ID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + }, + QueueSizes: map[string]int{}, + FCMTokensSynced: []string{"FCMID1", "FCMID2"}, + PendingMsgs: 0, + }, + { + Description: "messages without channels set to PENDING", + Msgs: []msgSpec{ + { + ChannelID: models.NilChannelID, + ContactID: models.CathyID, + URNID: models.CathyURNID, + }, + }, + QueueSizes: map[string]int{}, + FCMTokensSynced: []string{}, + PendingMsgs: 1, + }, + } + + for _, tc := range tests { + msgs := make([]*models.Msg, len(tc.Msgs)) + for i, ms := range tc.Msgs { + msgs[i] = ms.createMsg(t, db, oa) + } + + rc.Do("FLUSHDB") + mockFCM.Messages = nil + + msgio.SendMessages(ctx, db, rp, fc, msgs) + + assertCourierQueueSizes(t, rc, tc.QueueSizes, "courier queue sizes mismatch in '%s'", tc.Description) + + // check the FCM tokens that were synced + actualTokens := make([]string, len(mockFCM.Messages)) + for i := range mockFCM.Messages { + actualTokens[i] = mockFCM.Messages[i].Token + } + + assert.Equal(t, tc.FCMTokensSynced, actualTokens, "FCM tokens mismatch in '%s'", tc.Description) + + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'P'`, nil, tc.PendingMsgs, `pending messages mismatch in '%s'`, tc.Description) + } +} diff --git a/core/runner/runner.go b/core/runner/runner.go index a873074a8..20a8e06b4 100644 --- a/core/runner/runner.go +++ b/core/runner/runner.go @@ -226,7 +226,7 @@ func StartFlowBatch( options := NewStartOptions() options.RestartParticipants = batch.RestartParticipants() options.IncludeActive = batch.IncludeActive() - options.Interrupt = true + options.Interrupt = flow.FlowType().Interrupts() options.TriggerBuilder = triggerBuilder options.CommitHook = updateStartID @@ -315,7 +315,7 @@ func FireCampaignEvents( } // if this is an ivr flow, we need to create a task to perform the start there - if dbFlow.FlowType() == models.IVRFlow { + if dbFlow.FlowType() == models.FlowTypeVoice { // Trigger our IVR flow start err := TriggerIVRFlow(ctx, db, rp, oa.OrgID(), dbFlow.ID(), contactIDs, func(ctx context.Context, tx *sqlx.Tx) error { return models.MarkEventsFired(ctx, tx, fires, time.Now(), models.FireResultFired) @@ -712,7 +712,7 @@ func TriggerIVRFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, orgID mode tx, _ := db.BeginTxx(ctx, nil) // create our start - start := models.NewFlowStart(orgID, models.StartTypeTrigger, models.IVRFlow, flowID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(orgID, models.StartTypeTrigger, models.FlowTypeVoice, flowID, models.DoRestartParticipants, models.DoIncludeActive). WithContactIDs(contactIDs) // insert it diff --git a/core/runner/runner_test.go b/core/runner/runner_test.go index 0e75e4a08..7c2479cb8 100644 --- a/core/runner/runner_test.go +++ b/core/runner/runner_test.go @@ -137,7 +137,7 @@ func TestBatchStart(t *testing.T) { last := time.Now() for i, tc := range tcs { - start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.MessagingFlow, tc.Flow, tc.Restart, tc.IncludeActive). + start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, tc.Flow, tc.Restart, tc.IncludeActive). WithContactIDs(contactIDs). WithExtra(tc.Extra) batch := start.CreateBatch(contactIDs, true, len(contactIDs)) diff --git a/core/tasks/broadcasts/worker.go b/core/tasks/broadcasts/worker.go index 6f825aea8..a7c5364eb 100644 --- a/core/tasks/broadcasts/worker.go +++ b/core/tasks/broadcasts/worker.go @@ -9,8 +9,8 @@ import ( "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/mailroom" - "github.com/nyaruka/mailroom/core/courier" "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" "github.com/nyaruka/mailroom/core/queue" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -171,14 +171,6 @@ func SendBroadcastBatch(ctx context.Context, db *sqlx.DB, rp *redis.Pool, bcast return errors.Wrapf(err, "error creating broadcast messages") } - // and queue them to courier for sending - rc := rp.Get() - defer rc.Close() - - err = courier.QueueMessages(rc, msgs) - if err != nil { - return errors.Wrapf(err, "error queuing broadcast messages") - } - + msgio.SendMessages(ctx, db, rp, nil, msgs) return nil } diff --git a/core/tasks/groups/populate_dynamic_group_test.go b/core/tasks/groups/populate_dynamic_group_test.go index 3aa77e42b..a397abbb9 100644 --- a/core/tasks/groups/populate_dynamic_group_test.go +++ b/core/tasks/groups/populate_dynamic_group_test.go @@ -11,7 +11,7 @@ import ( "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/testsuite/testdata" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/require" ) diff --git a/core/tasks/handler/worker.go b/core/tasks/handler/worker.go index 4e7bb224a..a2fdb2764 100644 --- a/core/tasks/handler/worker.go +++ b/core/tasks/handler/worker.go @@ -251,7 +251,7 @@ func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventTyp } // get the active session for this contact - session, err := models.ActiveSessionForContact(ctx, db, oa, models.MessagingFlow, contact) + session, err := models.ActiveSessionForContact(ctx, db, oa, models.FlowTypeMessaging, contact) if err != nil { return errors.Wrapf(err, "error loading active session for contact") } @@ -407,7 +407,7 @@ func HandleChannelEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventT } // if this is an IVR flow, we need to trigger that start (which happens in a different queue) - if flow.FlowType() == models.IVRFlow && conn == nil { + if flow.FlowType() == models.FlowTypeVoice && conn == nil { err = runner.TriggerIVRFlow(ctx, db, rp, oa.OrgID(), flow.ID(), []models.ContactID{modelContact.ID()}, nil) if err != nil { return nil, errors.Wrapf(err, "error while triggering ivr flow") @@ -585,7 +585,7 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg trigger := models.FindMatchingMsgTrigger(oa, contact, event.Text) // get any active session for this contact - session, err := models.ActiveSessionForContact(ctx, db, oa, models.MessagingFlow, contact) + session, err := models.ActiveSessionForContact(ctx, db, oa, models.FlowTypeMessaging, contact) if err != nil { return errors.Wrapf(err, "error loading active session for contact") } @@ -637,7 +637,7 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg // trigger flow is still active, start it if flow != nil { // if this is an IVR flow, we need to trigger that start (which happens in a different queue) - if flow.FlowType() == models.IVRFlow { + if flow.FlowType() == models.FlowTypeVoice { err = runner.TriggerIVRFlow(ctx, db, rp, oa.OrgID(), flow.ID(), []models.ContactID{modelContact.ID()}, func(ctx context.Context, tx *sqlx.Tx) error { return models.UpdateMessage(ctx, tx, event.MsgID, models.MsgStatusHandled, models.VisibilityVisible, models.TypeFlow, topupID) }) diff --git a/core/tasks/interrupts/interrupt_sessions_test.go b/core/tasks/interrupts/interrupt_sessions_test.go index 3e0fbb788..6d271f3c5 100644 --- a/core/tasks/interrupts/interrupt_sessions_test.go +++ b/core/tasks/interrupts/interrupt_sessions_test.go @@ -34,8 +34,8 @@ func TestInterrupts(t *testing.T) { insertSession := func(orgID models.OrgID, contactID models.ContactID, connectionID models.ConnectionID, currentFlowID models.FlowID) models.SessionID { var sessionID models.SessionID err := db.Get(&sessionID, - `INSERT INTO flows_flowsession(uuid, status, responded, created_on, org_id, contact_id, connection_id, current_flow_id) - VALUES($1, 'W', false, NOW(), $2, $3, $4, $5) RETURNING id`, + `INSERT INTO flows_flowsession(uuid, status, responded, created_on, org_id, contact_id, connection_id, current_flow_id, session_type) + VALUES($1, 'W', false, NOW(), $2, $3, $4, $5, 'M') RETURNING id`, uuids.New(), orgID, contactID, connectionID, currentFlowID) assert.NoError(t, err) diff --git a/core/tasks/ivr/cron_test.go b/core/tasks/ivr/cron_test.go index f10a21132..c37a5b62e 100644 --- a/core/tasks/ivr/cron_test.go +++ b/core/tasks/ivr/cron_test.go @@ -27,7 +27,7 @@ func TestRetries(t *testing.T) { db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, models.TwilioChannelID) // create a flow start for cathy - start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.IVRFlow, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.FlowTypeVoice, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). WithContactIDs([]models.ContactID{models.CathyID}) // call our master starter diff --git a/core/tasks/ivr/worker.go b/core/tasks/ivr/worker.go index c83d46c6f..c4a00858b 100644 --- a/core/tasks/ivr/worker.go +++ b/core/tasks/ivr/worker.go @@ -57,7 +57,7 @@ func HandleFlowStartBatch(bg context.Context, config *config.Config, db *sqlx.DB // filter out our list of contacts to only include those that should be started if !batch.IncludeActive() { // find all participants active in other sessions - active, err := models.FindActiveSessionOverlap(ctx, db, models.IVRFlow, batch.ContactIDs()) + active, err := models.FindActiveSessionOverlap(ctx, db, models.FlowTypeVoice, batch.ContactIDs()) if err != nil { return errors.Wrapf(err, "error finding other active sessions: %d", batch.FlowID()) } diff --git a/core/tasks/ivr/worker_test.go b/core/tasks/ivr/worker_test.go index e001dd541..140e45e46 100644 --- a/core/tasks/ivr/worker_test.go +++ b/core/tasks/ivr/worker_test.go @@ -36,7 +36,7 @@ func TestIVR(t *testing.T) { db.MustExec(`UPDATE channels_channel SET channel_type = 'ZZ', config = '{"max_concurrent_events": 1}' WHERE id = $1`, models.TwilioChannelID) // create a flow start for cathy - start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.IVRFlow, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.FlowTypeVoice, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). WithContactIDs([]models.ContactID{models.CathyID}) // call our master starter diff --git a/core/tasks/starts/worker.go b/core/tasks/starts/worker.go index ab8eb982f..4d4f19d9f 100644 --- a/core/tasks/starts/worker.go +++ b/core/tasks/starts/worker.go @@ -15,7 +15,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/lib/pq" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -153,7 +153,7 @@ func CreateFlowBatches(ctx context.Context, db *sqlx.DB, rp *redis.Pool, ec *ela // task is different if we are an IVR flow taskType := queue.StartFlowBatch - if start.FlowType() == models.IVRFlow { + if start.FlowType() == models.FlowTypeVoice { taskType = queue.StartIVRFlowBatch } diff --git a/core/tasks/starts/worker_test.go b/core/tasks/starts/worker_test.go index aa602a215..3c5500ce3 100644 --- a/core/tasks/starts/worker_test.go +++ b/core/tasks/starts/worker_test.go @@ -14,7 +14,7 @@ import ( "github.com/nyaruka/mailroom/core/runner" "github.com/nyaruka/mailroom/testsuite" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,119 +39,131 @@ func TestStarts(t *testing.T) { mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: rp, ElasticClient: es} + // convert our single message flow to an actual background flow that shouldn't interrupt + db.MustExec(`UPDATE flows_flow SET flow_type = 'B' WHERE id = $1`, models.SingleMessageFlowID) + // insert a flow run for one of our contacts // TODO: can be replaced with a normal flow start of another flow once we support flows with waits db.MustExec( `INSERT INTO flows_flowrun(uuid, status, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id) - VALUES($1, 'W', TRUE, now(), now(), FALSE, $2, $3, 1);`, uuids.New(), models.GeorgeID, models.SingleMessageFlowID) + VALUES($1, 'W', TRUE, now(), now(), FALSE, $2, $3, 1);`, uuids.New(), models.GeorgeID, models.FavoritesFlowID) tcs := []struct { - Label string - FlowID models.FlowID - GroupIDs []models.GroupID - ContactIDs []models.ContactID - CreateContact bool - Query string - QueryResponse string - RestartParticipants models.RestartParticipants - IncludeActive models.IncludeActive - Queue string - ContactCount int - BatchCount int - TotalCount int - Status models.StartStatus + label string + flowID models.FlowID + groupIDs []models.GroupID + contactIDs []models.ContactID + createContact bool + query string + queryResponse string + restartParticipants models.RestartParticipants + includeActive models.IncludeActive + queue string + expectedContactCount int + expectedBatchCount int + expectedTotalCount int + expectedStatus models.StartStatus + expectedActiveRuns map[models.FlowID]int }{ { - Label: "empty flow start", - FlowID: models.SingleMessageFlowID, - Queue: queue.BatchQueue, - ContactCount: 0, - BatchCount: 0, - TotalCount: 0, - Status: models.StartStatusComplete, + label: "Empty flow start", + flowID: models.FavoritesFlowID, + queue: queue.BatchQueue, + expectedContactCount: 0, + expectedBatchCount: 0, + expectedTotalCount: 0, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 1, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Single group", - FlowID: models.SingleMessageFlowID, - GroupIDs: []models.GroupID{models.DoctorsGroupID}, - Queue: queue.BatchQueue, - ContactCount: 121, - BatchCount: 2, - TotalCount: 121, - Status: models.StartStatusComplete, + label: "Single group", + flowID: models.FavoritesFlowID, + groupIDs: []models.GroupID{models.DoctorsGroupID}, + queue: queue.BatchQueue, + expectedContactCount: 121, + expectedBatchCount: 2, + expectedTotalCount: 121, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 122, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Group and Contact (but all already active)", - FlowID: models.SingleMessageFlowID, - GroupIDs: []models.GroupID{models.DoctorsGroupID}, - ContactIDs: []models.ContactID{models.CathyID}, - Queue: queue.BatchQueue, - ContactCount: 121, - BatchCount: 2, - TotalCount: 0, - Status: models.StartStatusComplete, + label: "Group and Contact (but all already active)", + flowID: models.FavoritesFlowID, + groupIDs: []models.GroupID{models.DoctorsGroupID}, + contactIDs: []models.ContactID{models.CathyID}, + queue: queue.BatchQueue, + expectedContactCount: 121, + expectedBatchCount: 2, + expectedTotalCount: 0, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 122, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Contact restart", - FlowID: models.SingleMessageFlowID, - ContactIDs: []models.ContactID{models.CathyID}, - RestartParticipants: true, - IncludeActive: true, - Queue: queue.HandlerQueue, - ContactCount: 1, - BatchCount: 1, - TotalCount: 1, - Status: models.StartStatusComplete, + label: "Contact restart", + flowID: models.FavoritesFlowID, + contactIDs: []models.ContactID{models.CathyID}, + restartParticipants: true, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 122, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Previous group and one new contact", - FlowID: models.SingleMessageFlowID, - GroupIDs: []models.GroupID{models.DoctorsGroupID}, - ContactIDs: []models.ContactID{models.BobID}, - Queue: queue.BatchQueue, - ContactCount: 122, - BatchCount: 2, - TotalCount: 1, - Status: models.StartStatusComplete, + label: "Previous group and one new contact", + flowID: models.FavoritesFlowID, + groupIDs: []models.GroupID{models.DoctorsGroupID}, + contactIDs: []models.ContactID{models.BobID}, + queue: queue.BatchQueue, + expectedContactCount: 122, + expectedBatchCount: 2, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Single contact, no restart", - FlowID: models.SingleMessageFlowID, - ContactIDs: []models.ContactID{models.BobID}, - Queue: queue.HandlerQueue, - ContactCount: 1, - BatchCount: 1, - TotalCount: 0, - Status: models.StartStatusComplete, + label: "Single contact, no restart", + flowID: models.FavoritesFlowID, + contactIDs: []models.ContactID{models.BobID}, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 0, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Single contact, include active, but no restart", - FlowID: models.SingleMessageFlowID, - ContactIDs: []models.ContactID{models.BobID}, - IncludeActive: true, - Queue: queue.HandlerQueue, - ContactCount: 1, - BatchCount: 1, - TotalCount: 0, - Status: models.StartStatusComplete, + label: "Single contact, include active, but no restart", + flowID: models.FavoritesFlowID, + contactIDs: []models.ContactID{models.BobID}, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 0, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Single contact, include active and restart", - FlowID: models.SingleMessageFlowID, - ContactIDs: []models.ContactID{models.BobID}, - RestartParticipants: true, - IncludeActive: true, - Queue: queue.HandlerQueue, - ContactCount: 1, - BatchCount: 1, - TotalCount: 1, - Status: models.StartStatusComplete, + label: "Single contact, include active and restart", + flowID: models.FavoritesFlowID, + contactIDs: []models.ContactID{models.BobID}, + restartParticipants: true, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Query start", - FlowID: models.SingleMessageFlowID, - Query: "bob", - QueryResponse: fmt.Sprintf(`{ + label: "Query start", + flowID: models.FavoritesFlowID, + query: "bob", + queryResponse: fmt.Sprintf(`{ "_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==", "took": 2, "timed_out": false, @@ -178,47 +190,74 @@ func TestStarts(t *testing.T) { ] } }`, models.BobID), - RestartParticipants: true, - IncludeActive: true, - Queue: queue.HandlerQueue, - ContactCount: 1, - BatchCount: 1, - TotalCount: 1, - Status: models.StartStatusComplete, + restartParticipants: true, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, + }, + { + label: "Query start with invalid query", + flowID: models.FavoritesFlowID, + query: "xyz = 45", + restartParticipants: true, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 0, + expectedBatchCount: 0, + expectedTotalCount: 0, + expectedStatus: models.StartStatusFailed, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "Query start with invalid query", - FlowID: models.SingleMessageFlowID, - Query: "xyz = 45", - RestartParticipants: true, - IncludeActive: true, - Queue: queue.HandlerQueue, - ContactCount: 0, - BatchCount: 0, - TotalCount: 0, - Status: models.StartStatusFailed, + label: "New Contact", + flowID: models.FavoritesFlowID, + createContact: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 124, models.PickNumberFlowID: 0, models.SingleMessageFlowID: 0}, }, { - Label: "New Contact", - FlowID: models.SingleMessageFlowID, - CreateContact: true, - Queue: queue.HandlerQueue, - ContactCount: 1, - BatchCount: 1, - TotalCount: 1, - Status: models.StartStatusComplete, + label: "Other messaging flow", + flowID: models.PickNumberFlowID, + contactIDs: []models.ContactID{models.BobID}, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 1, models.SingleMessageFlowID: 0}, + }, + { + label: "Background flow", + flowID: models.SingleMessageFlowID, + contactIDs: []models.ContactID{models.BobID}, + includeActive: true, + queue: queue.HandlerQueue, + expectedContactCount: 1, + expectedBatchCount: 1, + expectedTotalCount: 1, + expectedStatus: models.StartStatusComplete, + expectedActiveRuns: map[models.FlowID]int{models.FavoritesFlowID: 123, models.PickNumberFlowID: 1, models.SingleMessageFlowID: 0}, }, } for _, tc := range tcs { - mes.NextResponse = tc.QueryResponse + mes.NextResponse = tc.queryResponse // handle our start task - start := models.NewFlowStart(models.Org1, models.StartTypeManual, models.MessagingFlow, tc.FlowID, tc.RestartParticipants, tc.IncludeActive). - WithGroupIDs(tc.GroupIDs). - WithContactIDs(tc.ContactIDs). - WithQuery(tc.Query). - WithCreateContact(tc.CreateContact) + start := models.NewFlowStart(models.Org1, models.StartTypeManual, models.FlowTypeMessaging, tc.flowID, tc.restartParticipants, tc.includeActive). + WithGroupIDs(tc.groupIDs). + WithContactIDs(tc.contactIDs). + WithQuery(tc.query). + WithCreateContact(tc.createContact) err := models.InsertFlowStarts(ctx, db, []*models.FlowStart{start}) assert.NoError(t, err) @@ -233,7 +272,7 @@ func TestStarts(t *testing.T) { var task *queue.Task count := 0 for { - task, err = queue.PopNextTask(rc, tc.Queue) + task, err = queue.PopNextTask(rc, tc.queue) assert.NoError(t, err) if task == nil { break @@ -250,20 +289,25 @@ func TestStarts(t *testing.T) { } // assert our count of batches - assert.Equal(t, tc.BatchCount, count, "unexpected batch count in '%s'", tc.Label) + assert.Equal(t, tc.expectedBatchCount, count, "unexpected batch count in '%s'", tc.label) // assert our count of total flow runs created - testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun where flow_id = $1 AND start_id = $2 AND is_active = FALSE`, - []interface{}{tc.FlowID, start.ID()}, tc.TotalCount, "unexpected total run count in '%s'", tc.Label) + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE flow_id = $1 AND start_id = $2`, + []interface{}{tc.flowID, start.ID()}, tc.expectedTotalCount, "unexpected total run count in '%s'", tc.label) // assert final status testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart where status = $2 AND id = $1`, - []interface{}{start.ID(), tc.Status}, 1, "status mismatch in '%s'", tc.Label) + []interface{}{start.ID(), tc.expectedStatus}, 1, "status mismatch in '%s'", tc.label) // assert final contact count - if tc.Status != models.StartStatusFailed { + if tc.expectedStatus != models.StartStatusFailed { testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart where contact_count = $2 AND id = $1`, - []interface{}{start.ID(), tc.ContactCount}, 1, "contact count mismatch in '%s'", tc.Label) + []interface{}{start.ID(), tc.expectedContactCount}, 1, "contact count mismatch in '%s'", tc.label) + } + + // assert count of active runs by flow + for flowID, activeRuns := range tc.expectedActiveRuns { + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE status = 'W' AND flow_id = $1`, []interface{}{flowID}, activeRuns, "active runs mismatch for flow #%d in '%s'", flowID, tc.label) } } } diff --git a/go.mod b/go.mod index f553d8636..2207aee05 100644 --- a/go.mod +++ b/go.mod @@ -18,11 +18,12 @@ require ( github.com/mattn/go-sqlite3 v1.10.0 // indirect github.com/nyaruka/ezconf v0.2.1 github.com/nyaruka/gocommon v1.7.1 - github.com/nyaruka/goflow v0.107.1 + github.com/nyaruka/goflow v0.113.0 github.com/nyaruka/librato v1.0.0 github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d github.com/nyaruka/null v1.2.0 - github.com/olivere/elastic v6.2.35+incompatible + github.com/nyaruka/phonenumbers v1.0.65 // indirect + github.com/olivere/elastic/v7 v7.0.22 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_model v0.2.0 diff --git a/go.sum b/go.sum index 4bcb58fa2..237e1bd63 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -15,6 +17,8 @@ github.com/aws/aws-sdk-go v1.20.6 h1:kmy4Gvdlyez1fV4kw5RYxZzWKVyuHZHgPWeU/YvRsV4 github.com/aws/aws-sdk-go v1.20.6/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.34.31 h1:408wh5EHKzxyby8JpYfnn1w3fsF26AIU0o1kbJoRy7E= github.com/aws/aws-sdk-go v1.34.31/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= +github.com/aws/aws-sdk-go v1.35.20 h1:Hs7x9Czh+MMPnZLQqHhsuZKeNFA3Vuf7pdy2r5QlVb0= +github.com/aws/aws-sdk-go v1.35.20/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59/go.mod h1:q/89r3U2H7sSsE2t6Kca0lfwTK8JdoNGS/yzM/4iH5I= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -24,6 +28,7 @@ github.com/buger/jsonparser v1.0.0 h1:etJTGF5ESxjI0Ic2UaLQs2LQQpa8G9ykQScukbh4L8 github.com/buger/jsonparser v1.0.0/go.mod h1:tgcrVJ81GPSF0mz+0nu1Xaz0fazGPrmmJfJtxjbHhUQ= github.com/certifi/gocertifi v0.0.0-20200211180108-c7c1fbc02894 h1:JLaf/iINcLyjwbtTsCJjc6rtlASgHeIJPrB6QmwURnA= github.com/certifi/gocertifi v0.0.0-20200211180108-c7c1fbc02894/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -58,6 +63,9 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -134,8 +142,12 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0= github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw= github.com/nyaruka/gocommon v1.7.1 h1:1x3v1L69svMnmD38pPvEb1QOyMBkP0QKk8WLDCRJW0w= github.com/nyaruka/gocommon v1.7.1/go.mod h1:r5UqoAdoP9VLb/wmtF1O0v73PQc79tZaVjbXlO16PUA= -github.com/nyaruka/goflow v0.107.1 h1:7JorXBo1xSeSwiohiD4PgWdUS2nuFSHgj8npgXvF+24= -github.com/nyaruka/goflow v0.107.1/go.mod h1:VZ+2MbwOhUApS3+FbwlRA84GpaHtpeVB3xvvPsiHlvg= +github.com/nyaruka/goflow v0.112.1 h1:kM7gsXxiP2QvPv+/kduXAzcOX+KJB63l1RpqFZQfccc= +github.com/nyaruka/goflow v0.112.1/go.mod h1:FBksI24mzBjwMzUd/xkr1gulklP6bbTKcaEq6qKNQgE= +github.com/nyaruka/goflow v0.112.2 h1:vizMqpxhDkCv2i9wJjyaHECOaIvvxfPiyvYwfXRnIMw= +github.com/nyaruka/goflow v0.112.2/go.mod h1:FBksI24mzBjwMzUd/xkr1gulklP6bbTKcaEq6qKNQgE= +github.com/nyaruka/goflow v0.113.0 h1:oAYtBlClCjWVUNq/0h5AmBCgE/CrBT7tHzHQtFAy9N8= +github.com/nyaruka/goflow v0.113.0/go.mod h1:FBksI24mzBjwMzUd/xkr1gulklP6bbTKcaEq6qKNQgE= github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0= github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg= github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc= @@ -144,10 +156,13 @@ github.com/nyaruka/null v1.2.0 h1:uEbkyy4Z+zPB2Pr3ryQh/0N2965I9kEsXq/cGpyJ7PA= github.com/nyaruka/null v1.2.0/go.mod h1:HSAFbLNOaEhHnoU0VCveCPz0GDtJ3GEtFWhvnBNkhPE= github.com/nyaruka/phonenumbers v1.0.58 h1:IAlGDA4wuGQXe2lwOQvkZfBvA1DlAik+MX5k9k5C2IU= github.com/nyaruka/phonenumbers v1.0.58/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U= -github.com/olivere/elastic v6.2.35+incompatible h1:MMklYDy2ySi01s123CB2WLBuDMzFX4qhFcA5tKWJPgM= -github.com/olivere/elastic v6.2.35+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= +github.com/nyaruka/phonenumbers v1.0.65 h1:xey76OEQu7loamZ/hCWe77SBPQu0dpI8ibMWfSBuawk= +github.com/nyaruka/phonenumbers v1.0.65/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U= +github.com/olivere/elastic/v7 v7.0.22 h1:esBA6JJwvYgfms0EVlH7Z+9J4oQ/WUADF2y/nCNDw7s= +github.com/olivere/elastic/v7 v7.0.22/go.mod h1:VDexNy9NjmtAkrjNoI7tImv7FR4tf5zUA3ickqu5Pc8= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= @@ -181,8 +196,10 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= github.com/smartystreets/gunit v1.0.0/go.mod h1:qwPWnhz6pn0NnRBP++URONOVyNkPyr4SauJk4cUOwJs= +github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= @@ -191,27 +208,40 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= github.com/tj/go-elastic v0.0.0-20171221160941-36157cbbebc2/go.mod h1:WjeM0Oo1eNAjXGDx2yma7uG2XoyRZTq1uv3M/o7imD0= github.com/tj/go-kinesis v0.0.0-20171128231115-08b17f58cb1b/go.mod h1:/yhzCV0xPfx6jb1bBgRFjl5lytqVqZXEaeqWP8lTEao= github.com/tj/go-spin v1.1.0/go.mod h1:Mg1mzmePZm4dva8Qz60H2lHwmJ2loum4VIrLgVnKwh4= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200925080053-05aa5d4ee321 h1:lleNcKRbcaC8MqgLwghIkzZ2JBQAb7QQ9MiwRt1BisA= golang.org/x/net v0.0.0-20200925080053-05aa5d4ee321/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -220,6 +250,7 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -229,8 +260,17 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -261,3 +301,4 @@ gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/goreleaser.yml b/goreleaser.yml index 42344b733..69f927f48 100644 --- a/goreleaser.yml +++ b/goreleaser.yml @@ -12,6 +12,4 @@ archives: - LICENSE - README.md - docs/* - - docs/completion.json - - docs/functions.json - docs/**/* diff --git a/mailroom.go b/mailroom.go index 3661e82a9..91fc2ae69 100644 --- a/mailroom.go +++ b/mailroom.go @@ -17,7 +17,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/nyaruka/librato" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/sirupsen/logrus" ) @@ -192,6 +192,11 @@ func (mr *Mailroom) Start() error { log.Info("elastic ok") } + // warn if we won't be doing FCM syncing + if config.Mailroom.FCMKey == "" { + logrus.Error("fcm not configured, no syncing of android channels") + } + for _, initFunc := range initFunctions { initFunc(mr) } diff --git a/services/tickets/internal/service.go b/services/tickets/intern/service.go similarity index 98% rename from services/tickets/internal/service.go rename to services/tickets/intern/service.go index 42a52510b..4bd11db29 100644 --- a/services/tickets/internal/service.go +++ b/services/tickets/intern/service.go @@ -1,4 +1,4 @@ -package internal +package intern import ( "net/http" diff --git a/services/tickets/internal/service_test.go b/services/tickets/intern/service_test.go similarity index 82% rename from services/tickets/internal/service_test.go rename to services/tickets/intern/service_test.go index 6bcdc461b..108bd8b97 100644 --- a/services/tickets/internal/service_test.go +++ b/services/tickets/intern/service_test.go @@ -1,4 +1,4 @@ -package internal_test +package intern_test import ( "net/http" @@ -12,7 +12,7 @@ import ( "github.com/nyaruka/goflow/test" "github.com/nyaruka/goflow/utils" "github.com/nyaruka/mailroom/core/models" - "github.com/nyaruka/mailroom/services/tickets/internal" + intern "github.com/nyaruka/mailroom/services/tickets/intern" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,7 +27,7 @@ func TestOpenAndForward(t *testing.T) { ticketer := flows.NewTicketer(types.NewTicketer(assets.TicketerUUID(uuids.New()), "Support", "internal")) - svc, err := internal.NewService( + svc, err := intern.NewService( http.DefaultClient, nil, ticketer, @@ -49,7 +49,7 @@ func TestOpenAndForward(t *testing.T) { assert.Equal(t, 0, len(logger.Logs)) - dbTicket := models.NewTicket(ticket.UUID, models.Org1, models.CathyID, models.MailgunID, "", "Need help", "Where are my cookies?", nil) + dbTicket := models.NewTicket(ticket.UUID, models.Org1, models.CathyID, models.InternalID, "", "Need help", "Where are my cookies?", nil) logger = &flows.HTTPLogger{} err = svc.Forward( @@ -69,13 +69,13 @@ func TestCloseAndReopen(t *testing.T) { defer uuids.SetGenerator(uuids.DefaultGenerator) uuids.SetGenerator(uuids.NewSeededGenerator(12345)) - ticketer := flows.NewTicketer(types.NewTicketer(assets.TicketerUUID(uuids.New()), "Support", "mailgun")) - svc, err := internal.NewService(http.DefaultClient, nil, ticketer, nil) + ticketer := flows.NewTicketer(types.NewTicketer(assets.TicketerUUID(uuids.New()), "Support", "internal")) + svc, err := intern.NewService(http.DefaultClient, nil, ticketer, nil) require.NoError(t, err) logger := &flows.HTTPLogger{} - ticket1 := models.NewTicket("88bfa1dc-be33-45c2-b469-294ecb0eba90", models.Org1, models.CathyID, models.ZendeskID, "12", "New ticket", "Where my cookies?", nil) - ticket2 := models.NewTicket("645eee60-7e84-4a9e-ade3-4fce01ae28f1", models.Org1, models.BobID, models.ZendeskID, "14", "Second ticket", "Where my shoes?", nil) + ticket1 := models.NewTicket("88bfa1dc-be33-45c2-b469-294ecb0eba90", models.Org1, models.CathyID, models.InternalID, "12", "New ticket", "Where my cookies?", nil) + ticket2 := models.NewTicket("645eee60-7e84-4a9e-ade3-4fce01ae28f1", models.Org1, models.BobID, models.InternalID, "14", "Second ticket", "Where my shoes?", nil) err = svc.Close([]*models.Ticket{ticket1, ticket2}, logger.Log) diff --git a/services/tickets/utils.go b/services/tickets/utils.go index 89c7aed08..7f45f4f4c 100644 --- a/services/tickets/utils.go +++ b/services/tickets/utils.go @@ -17,8 +17,8 @@ import ( "github.com/nyaruka/goflow/envs" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/utils" - "github.com/nyaruka/mailroom/core/courier" "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" @@ -110,17 +110,8 @@ func SendReply(ctx context.Context, db *sqlx.DB, rp *redis.Pool, store storage.S return nil, errors.Wrapf(err, "error creating message batch") } - msg := msgs[0] - - // queue our message - rc := rp.Get() - defer rc.Close() - - err = courier.QueueMessages(rc, []*models.Msg{msg}) - if err != nil { - return msg, errors.Wrapf(err, "error queuing ticket reply") - } - return msg, nil + msgio.SendMessages(ctx, db, rp, nil, msgs) + return msgs[0], nil } var retries = httpx.NewFixedRetries(time.Second*5, time.Second*10) diff --git a/testsuite/testdata/channels.go b/testsuite/testdata/channels.go new file mode 100644 index 000000000..bc41db26b --- /dev/null +++ b/testsuite/testdata/channels.go @@ -0,0 +1,24 @@ +package testdata + +import ( + "testing" + + "github.com/nyaruka/gocommon/uuids" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/null" + + "github.com/jmoiron/sqlx" + "github.com/lib/pq" + "github.com/stretchr/testify/require" +) + +// InsertChannel inserts a channel +func InsertChannel(t *testing.T, db *sqlx.DB, orgID models.OrgID, channelType, name string, schemes []string, role string, config map[string]interface{}) models.ChannelID { + var id models.ChannelID + err := db.Get(&id, + `INSERT INTO channels_channel(uuid, org_id, channel_type, name, schemes, role, config, last_seen, is_active, created_on, modified_on, created_by_id, modified_by_id) + VALUES($1, $2, $3, $4, $5, $6, $7, NOW(), TRUE, NOW(), NOW(), 1, 1) RETURNING id`, uuids.New(), orgID, channelType, name, pq.Array(schemes), role, null.NewMap(config), + ) + require.NoError(t, err) + return id +} diff --git a/testsuite/testdata/flows.go b/testsuite/testdata/flows.go index ca4c8e810..9ae8fdff6 100644 --- a/testsuite/testdata/flows.go +++ b/testsuite/testdata/flows.go @@ -33,8 +33,8 @@ func InsertFlowStart(t *testing.T, db *sqlx.DB, orgID models.OrgID, flowID model func InsertFlowSession(t *testing.T, db *sqlx.DB, uuid flows.SessionUUID, orgID models.OrgID, contactID models.ContactID, status models.SessionStatus, timeoutOn *time.Time) models.SessionID { var id models.SessionID err := db.Get(&id, - `INSERT INTO flows_flowsession(uuid, org_id, contact_id, status, responded, created_on, timeout_on) - VALUES($1, $2, $3, $4, TRUE, NOW(), $5) RETURNING id`, uuid, orgID, contactID, status, timeoutOn, + `INSERT INTO flows_flowsession(uuid, org_id, contact_id, status, responded, created_on, timeout_on, session_type) + VALUES($1, $2, $3, $4, TRUE, NOW(), $5, 'M') RETURNING id`, uuid, orgID, contactID, status, timeoutOn, ) require.NoError(t, err) return id diff --git a/web/contact/search_test.go b/web/contact/search_test.go index 12979307f..180adcfb6 100644 --- a/web/contact/search_test.go +++ b/web/contact/search_test.go @@ -18,7 +18,7 @@ import ( "github.com/nyaruka/mailroom/testsuite" "github.com/nyaruka/mailroom/web" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/stretchr/testify/assert" ) @@ -172,7 +172,8 @@ func TestSearch(t *testing.T) { "order": "desc" } } - ] + ], + "track_total_hits": true }`, }, { diff --git a/web/contact/testdata/modify.json b/web/contact/testdata/modify.json index a0c689999..b5f77f969 100644 --- a/web/contact/testdata/modify.json +++ b/web/contact/testdata/modify.json @@ -1018,9 +1018,9 @@ }, "events": [ { + "type": "error", "created_on": "2018-07-06T12:30:00.123456789Z", - "text": "'xyz:12345' is not valid URN", - "type": "error" + "text": "'xyz:12345' is not valid URN" } ] } diff --git a/web/contact/testdata/resolve.json b/web/contact/testdata/resolve.json index 1c256411b..d7fbec34b 100644 --- a/web/contact/testdata/resolve.json +++ b/web/contact/testdata/resolve.json @@ -46,11 +46,11 @@ } } }, + "created": false, "urn": { "id": 10001, "identity": "tel:+16055742222" - }, - "created": false + } }, "db_assertions": [ { @@ -80,11 +80,11 @@ "tel:+16055747777?channel=74729f45-7f29-4868-9dc4-90e491e3c7d8&id=20121&priority=1000" ] }, + "created": true, "urn": { "id": 20121, "identity": "tel:+16055747777" - }, - "created": true + } }, "db_assertions": [ { diff --git a/web/ivr/ivr_test.go b/web/ivr/ivr_test.go index e82daafac..be2e73f2b 100644 --- a/web/ivr/ivr_test.go +++ b/web/ivr/ivr_test.go @@ -70,7 +70,7 @@ func TestTwilioIVR(t *testing.T) { // create a flow start for cathy and george parentSummary := json.RawMessage(`{"flow": {"name": "IVR Flow", "uuid": "2f81d0ea-4d75-4843-9371-3f7465311cce"}, "uuid": "8bc73097-ac57-47fb-82e5-184f8ec6dbef", "status": "active", "contact": {"id": 10000, "name": "Cathy", "urns": ["tel:+16055741111?id=10000&priority=50"], "uuid": "6393abc0-283d-4c9b-a1b3-641a035c34bf", "fields": {"gender": {"text": "F"}}, "groups": [{"name": "Doctors", "uuid": "c153e265-f7c9-4539-9dbc-9b358714b638"}], "timezone": "America/Los_Angeles", "created_on": "2019-07-23T09:35:01.439614-07:00"}, "results": {}}`) - start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.IVRFlow, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.FlowTypeVoice, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). WithContactIDs([]models.ContactID{models.CathyID, models.GeorgeID}). WithParentSummary(parentSummary) @@ -374,7 +374,7 @@ func TestNexmoIVR(t *testing.T) { // create a flow start for cathy and george extra := json.RawMessage(`{"ref_id":"123"}`) - start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.IVRFlow, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). + start := models.NewFlowStart(models.Org1, models.StartTypeTrigger, models.FlowTypeVoice, models.IVRFlowID, models.DoRestartParticipants, models.DoIncludeActive). WithContactIDs([]models.ContactID{models.CathyID, models.GeorgeID}). WithExtra(extra) models.InsertFlowStarts(ctx, db, []*models.FlowStart{start}) diff --git a/web/server.go b/web/server.go index 21db50dfd..3fa07747b 100644 --- a/web/server.go +++ b/web/server.go @@ -17,7 +17,7 @@ import ( "github.com/go-chi/chi/middleware" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" - "github.com/olivere/elastic" + "github.com/olivere/elastic/v7" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) diff --git a/web/ticket/ticket_test.go b/web/ticket/ticket_test.go index ee2c2239c..37615c5b1 100644 --- a/web/ticket/ticket_test.go +++ b/web/ticket/ticket_test.go @@ -17,7 +17,7 @@ func TestTicketClose(t *testing.T) { testsuite.Reset() db := testsuite.DB() - // create 2 open tickets and 1 closed one for Cathy + // create 2 open tickets and 1 closed one for Cathy across two different ticketers testdata.InsertOpenTicket(t, db, models.Org1, models.CathyID, models.MailgunID, flows.TicketUUID(uuids.New()), "Need help", "Have you seen my cookies?", "17") testdata.InsertOpenTicket(t, db, models.Org1, models.CathyID, models.ZendeskID, flows.TicketUUID(uuids.New()), "More help", "Have you seen my cookies?", "21") testdata.InsertClosedTicket(t, db, models.Org1, models.CathyID, models.ZendeskID, flows.TicketUUID(uuids.New()), "Old question", "Have you seen my cookies?", "34")