Skip to content

Commit

Permalink
Merge pull request #121 from nyaruka/send_rework
Browse files Browse the repository at this point in the history
Rework message sending so that URNs are loaded when queueing
  • Loading branch information
rowanseymour authored Sep 11, 2023
2 parents 0008a31 + 353f51c commit 4d6ec09
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 169 deletions.
2 changes: 1 addition & 1 deletion core/handlers/webhook_called_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestUnhealthyWebhookCalls(t *testing.T) {
require.NoError(t, err)

env := envs.NewBuilder().Build()
_, cathy := testdata.Cathy.Load(rt, oa)
_, cathy, _ := testdata.Cathy.Load(rt, oa)

// webhook service with a 2 second delay
svc := &failingWebhookService{delay: 2 * time.Second}
Expand Down
111 changes: 54 additions & 57 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,37 +419,36 @@ func queryContactIDs(ctx context.Context, db Queryer, query string, args ...any)
}

type ContactURN struct {
ID URNID `json:"id" db:"id"`
Priority int `json:"priority" db:"priority"`
Scheme string `json:"scheme" db:"scheme"`
Path string `json:"path" db:"path"`
Display string `json:"display" db:"display"`
Auth string `json:"auth" db:"auth"`
ChannelID ChannelID `json:"channel_id" db:"channel_id"`
ID URNID `json:"id" db:"id"`
OrgID OrgID ` db:"org_id"`
ContactID ContactID ` db:"contact_id"`
Priority int `json:"priority" db:"priority"`
Identity urns.URN `json:"identity" db:"identity"`
Scheme string `json:"scheme" db:"scheme"`
Path string `json:"path" db:"path"`
Display null.String `json:"display" db:"display"`
Auth null.String `json:"auth" db:"auth"`
ChannelID ChannelID `json:"channel_id" db:"channel_id"`
}

// AsURN returns a full URN representation including the query parameters needed by goflow and mailroom
func (u *ContactURN) AsURN(oa *OrgAssets) (urns.URN, error) {
// load any channel if present
var channel *Channel
if u.ChannelID != ChannelID(0) {
channel = oa.ChannelByID(u.ChannelID)
}

// we build our query from a combination of preferred channel and auth
// id needed to turn msg_created events into database messages
query := url.Values{
"id": []string{fmt.Sprintf("%d", u.ID)},
"priority": []string{fmt.Sprintf("%d", u.Priority)},
}
if channel != nil {
query["channel"] = []string{string(channel.UUID())}
}
if u.Auth != "" {
query["auth"] = []string{u.Auth}

// channel needed by goflow URN/channel selection
if u.ChannelID != NilChannelID {
channel := oa.ChannelByID(u.ChannelID)
if channel != nil {
query["channel"] = []string{string(channel.UUID())}
}
}

// create our URN
urn, err := urns.NewURNFromParts(u.Scheme, u.Path, query.Encode(), u.Display)
urn, err := urns.NewURNFromParts(u.Scheme, u.Path, query.Encode(), string(u.Display))
if err != nil {
return urns.NilURN, errors.Wrapf(err, "invalid URN %s:%s", u.Scheme, u.Path)
}
Expand Down Expand Up @@ -865,9 +864,9 @@ func insertContactAndURNs(ctx context.Context, db DBorTx, orgID OrgID, userID Us
}
} else {
_, err := db.ExecContext(ctx,
`INSERT INTO contacts_contacturn(org_id, identity, path, scheme, display, auth, priority, channel_id, contact_id)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), GetURNAuth(urn), priority, channelID, contactID,
`INSERT INTO contacts_contacturn(org_id, identity, path, scheme, display, priority, channel_id, contact_id)
VALUES($1, $2, $3, $4, $5, $6, $7, $8)`,
orgID, urn.Identity(), urn.Path(), urn.Scheme(), urn.Display(), priority, channelID, contactID,
)
if err != nil {
return NilContactID, err
Expand Down Expand Up @@ -920,15 +919,14 @@ func GetOrCreateURN(ctx context.Context, db DBorTx, oa *OrgAssets, contactID Con
}

// otherwise we need to insert it
insert := &urnInsert{
insert := &ContactURN{
OrgID: oa.OrgID(),
ContactID: contactID,
Identity: u.Identity().String(),
Scheme: u.Scheme(),
Identity: u.Identity(),
Path: u.Path(),
Display: null.String(u.Display()),
Auth: GetURNAuth(u),
Scheme: u.Scheme(),
Priority: defaultURNPriority,
OrgID: oa.OrgID(),
}

_, err := db.NamedExecContext(ctx, sqlInsertContactURN, insert)
Expand Down Expand Up @@ -1104,27 +1102,39 @@ UPDATE contacts_contact
SET status = 'S', modified_on = NOW()
WHERE id = $1`

func GetURNInt(urn urns.URN, key string) int {
values, err := urn.Query()
const sqlSelectURNsByID = `
SELECT id, org_id, contact_id, identity, priority, scheme, path, display, auth, channel_id
FROM contacts_contacturn
WHERE id = ANY($1)`

// LoadContactURNs fetches contact URNs by their ids
func LoadContactURNs(ctx context.Context, db DBorTx, ids []URNID) ([]*ContactURN, error) {
rows, err := db.QueryxContext(ctx, sqlSelectURNsByID, pq.Array(ids))
if err != nil {
return 0
return nil, errors.Wrapf(err, "error querying URNs")
}
defer rows.Close()

value, _ := strconv.Atoi(values.Get(key))
return value
urns := make([]*ContactURN, 0)
for rows.Next() {
u := &ContactURN{}
err = rows.StructScan(&u)
if err != nil {
return nil, errors.Wrap(err, "error scanning URN row")
}
urns = append(urns, u)
}
return urns, nil
}

func GetURNAuth(urn urns.URN) null.String {
func GetURNInt(urn urns.URN, key string) int {
values, err := urn.Query()
if err != nil {
return null.NullString
return 0
}

value := values.Get("auth")
if value == "" {
return null.NullString
}
return null.String(value)
value, _ := strconv.Atoi(values.Get(key))
return value
}

func GetURNChannelID(oa *OrgAssets, urn urns.URN) ChannelID {
Expand Down Expand Up @@ -1235,15 +1245,14 @@ func UpdateContactURNs(ctx context.Context, db DBorTx, oa *OrgAssets, changes []
updatedURNIDs = append(updatedURNIDs, urnID)
} else {
// new URN, add it instead
inserts = append(inserts, &urnInsert{
inserts = append(inserts, &ContactURN{
OrgID: oa.OrgID(),
ContactID: change.ContactID,
Identity: urn.Identity().String(),
Identity: urn.Identity(),
Scheme: urn.Scheme(),
Path: urn.Path(),
Display: null.String(urn.Display()),
Auth: GetURNAuth(urn),
Scheme: urn.Scheme(),
Priority: priority,
OrgID: oa.OrgID(),
})

identities = append(identities, urn.Identity().String())
Expand Down Expand Up @@ -1320,18 +1329,6 @@ WHERE
u.id = r.id::int
`

// urnInsert is our object that represents a single contact URN addition
type urnInsert struct {
ContactID ContactID `db:"contact_id"`
Identity string `db:"identity"`
Path string `db:"path"`
Display null.String `db:"display"`
Auth null.String `db:"auth"`
Scheme string `db:"scheme"`
Priority int `db:"priority"`
OrgID OrgID `db:"org_id"`
}

const sqlInsertContactURN = `
INSERT INTO
contacts_contacturn(contact_id, identity, path, display, auth, scheme, priority, org_id)
Expand Down
14 changes: 13 additions & 1 deletion core/models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestGetOrCreateContactIDsFromURNs(t *testing.T) {
// add an orphaned URN
testdata.InsertContactURN(rt, testdata.Org1, nil, urns.URN("telegram:200001"), 100)

cathy, _ := testdata.Cathy.Load(rt, oa)
cathy, _, _ := testdata.Cathy.Load(rt, oa)

tcs := []struct {
orgID models.OrgID
Expand Down Expand Up @@ -613,6 +613,18 @@ func TestUpdateContactURNs(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contacturn`).Returns(numInitialURNs + 3)
}

func TestLoadContactURNs(t *testing.T) {
ctx, rt := testsuite.Runtime()

oa := testdata.Org1.Load(rt)
_, _, cathyURNs := testdata.Cathy.Load(rt, oa)
_, _, bobURNs := testdata.Bob.Load(rt, oa)

urns, err := models.LoadContactURNs(ctx, rt.DB, []models.URNID{cathyURNs[0].ID, bobURNs[0].ID})
assert.NoError(t, err)
assert.ElementsMatch(t, []*models.ContactURN{cathyURNs[0], bobURNs[0]}, urns)
}

func TestLockContacts(t *testing.T) {
ctx, rt := testsuite.Runtime()

Expand Down
19 changes: 2 additions & 17 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ type Msg struct {
ChannelID ChannelID `db:"channel_id"`
ContactID ContactID `db:"contact_id"`
ContactURNID *URNID `db:"contact_urn_id"`
URN urns.URN `db:"urn_urn"`
URNAuth null.String `db:"urn_auth"`

SentOn *time.Time `db:"sent_on"`
QueuedOn time.Time `db:"queued_on"`
Expand Down Expand Up @@ -158,8 +156,6 @@ func (m *Msg) ExternalID() null.String { return m.m.ExternalID }
func (m *Msg) Metadata() map[string]any { return m.m.Metadata }
func (m *Msg) MsgCount() int { return m.m.MsgCount }
func (m *Msg) ChannelID() ChannelID { return m.m.ChannelID }
func (m *Msg) URN() urns.URN { return m.m.URN }
func (m *Msg) URNAuth() null.String { return m.m.URNAuth }
func (m *Msg) OrgID() OrgID { return m.m.OrgID }
func (m *Msg) FlowID() FlowID { return m.m.FlowID }
func (m *Msg) TicketID() TicketID { return m.m.TicketID }
Expand All @@ -180,8 +176,6 @@ func (m *Msg) SetURN(urn urns.URN) error {
return nil
}

m.m.URN = urn

// set our ID if we have one
urnInt := GetURNInt(urn, "id")
if urnInt == 0 {
Expand All @@ -190,7 +184,6 @@ func (m *Msg) SetURN(urn urns.URN) error {

urnID := URNID(urnInt)
m.m.ContactURNID = &urnID
m.m.URNAuth = GetURNAuth(urn)

return nil
}
Expand Down Expand Up @@ -252,8 +245,6 @@ func NewOutgoingIVR(cfg *runtime.Config, orgID OrgID, call *Call, out *flows.Msg
m.ContactURNID = &urnID
m.ChannelID = call.ChannelID()

m.URN = out.URN()

m.OrgID = orgID
m.CreatedOn = createdOn
m.SentOn = &createdOn
Expand Down Expand Up @@ -344,7 +335,7 @@ func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact
}

// if we're sending to a phone, message may have to be sent in multiple parts
if m.URN.Scheme() == urns.TelScheme {
if out.URN().Scheme() == urns.TelScheme {
m.MsgCount = gsm7.Segments(m.Text) + len(m.Attachments)
}

Expand Down Expand Up @@ -490,13 +481,9 @@ SELECT
m.channel_id,
m.contact_id,
m.contact_urn_id,
m.org_id,
u.identity AS "urn_urn",
u.auth AS "urn_auth"
m.org_id
FROM
msgs_msg m
INNER JOIN
contacts_contacturn u ON u.id = m.contact_urn_id
INNER JOIN
channels_channel c ON c.id = m.channel_id
WHERE
Expand Down Expand Up @@ -651,8 +638,6 @@ func ResendMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAssets, msg
if err != nil {
return nil, errors.Wrap(err, "error loading URN")
}
msg.m.URN = urn // needs to be set for queueing to courier

contactURN, err := flows.ParseRawURN(channels, urn, assets.IgnoreMissing)
if err != nil {
return nil, errors.Wrap(err, "error parsing URN")
Expand Down
9 changes: 4 additions & 5 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func TestNewOutgoingFlowMsg(t *testing.T) {
assert.Equal(t, tc.QuickReplies, msg.QuickReplies())
assert.Equal(t, tc.Contact.ID, msg.ContactID())
assert.Equal(t, expectedChannelID, msg.ChannelID())
assert.Equal(t, tc.URN, msg.URN())
if tc.URNID != models.NilURNID {
assert.Equal(t, tc.URNID, *msg.ContactURNID())
} else {
Expand Down Expand Up @@ -379,8 +378,8 @@ func TestGetMsgRepetitions(t *testing.T) {
dates.SetNowSource(dates.NewFixedNowSource(time.Date(2021, 11, 18, 12, 13, 3, 234567, time.UTC)))

oa := testdata.Org1.Load(rt)
_, cathy := testdata.Cathy.Load(rt, oa)
_, george := testdata.George.Load(rt, oa)
_, cathy, _ := testdata.Cathy.Load(rt, oa)
_, george, _ := testdata.George.Load(rt, oa)

msg1 := flows.NewMsgOut(testdata.Cathy.URN, nil, "foo", nil, nil, nil, flows.NilMsgTopic, i18n.NilLocale, flows.NilUnsendableReason)
msg2 := flows.NewMsgOut(testdata.Cathy.URN, nil, "FOO", nil, nil, nil, flows.NilMsgTopic, i18n.NilLocale, flows.NilUnsendableReason)
Expand Down Expand Up @@ -510,7 +509,7 @@ func TestNewMsgOut(t *testing.T) {
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

_, cathy := testdata.Cathy.Load(rt, oa)
_, cathy, _ := testdata.Cathy.Load(rt, oa)

out, ch := models.NewMsgOut(oa, cathy, "hello", nil, nil, `eng-US`)
assert.Equal(t, "hello", out.Text())
Expand Down Expand Up @@ -543,7 +542,7 @@ func insertTestSession(t *testing.T, ctx context.Context, rt *runtime.Runtime, o
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
require.NoError(t, err)

_, flowContact := contact.Load(rt, oa)
_, flowContact, _ := contact.Load(rt, oa)

session, err := models.FindWaitingSessionForContact(ctx, rt.DB, rt.SessionStorage, oa, models.FlowTypeMessaging, flowContact)
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions core/models/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSessionCreationAndUpdating(t *testing.T) {
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshFlows)
require.NoError(t, err)

modelContact, _ := testdata.Bob.Load(rt, oa)
modelContact, _, _ := testdata.Bob.Load(rt, oa)

sa, flowSession, sprint1 := test.NewSessionBuilder().WithAssets(oa.SessionAssets()).WithFlow(flow.UUID).
WithContact(testdata.Bob.UUID, flows.ContactID(testdata.Bob.ID), "Bob", "eng", "").MustBuild()
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestSessionCreationAndUpdating(t *testing.T) {
})

// reload contact and check current flow is set
modelContact, _ = testdata.Bob.Load(rt, oa)
modelContact, _, _ = testdata.Bob.Load(rt, oa)
assert.Equal(t, flow.ID, modelContact.CurrentFlowID())

flowSession, err = session.FlowSession(rt.Config, oa.SessionAssets(), oa.Env())
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestSessionCreationAndUpdating(t *testing.T) {
assertdb.Query(t, rt.DB, `SELECT current_flow_id FROM contacts_contact WHERE id = $1`, testdata.Bob.ID).Returns(nil)

// reload contact and check current flow is cleared
modelContact, _ = testdata.Bob.Load(rt, oa)
modelContact, _, _ = testdata.Bob.Load(rt, oa)
assert.Equal(t, models.NilFlowID, modelContact.CurrentFlowID())
}

Expand All @@ -140,7 +140,7 @@ func TestSingleSprintSession(t *testing.T) {
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshFlows)
require.NoError(t, err)

modelContact, _ := testdata.Bob.Load(rt, oa)
modelContact, _, _ := testdata.Bob.Load(rt, oa)

_, flowSession, sprint1 := test.NewSessionBuilder().WithAssets(oa.SessionAssets()).WithFlow(flow.UUID).
WithContact(testdata.Bob.UUID, flows.ContactID(testdata.Bob.ID), "Bob", "eng", "").MustBuild()
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestSessionWithSubflows(t *testing.T) {
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshFlows)
require.NoError(t, err)

modelContact, _ := testdata.Cathy.Load(rt, oa)
modelContact, _, _ := testdata.Cathy.Load(rt, oa)

sa, flowSession, sprint1 := test.NewSessionBuilder().WithAssets(oa.SessionAssets()).WithFlow(parent.UUID).
WithContact(testdata.Cathy.UUID, flows.ContactID(testdata.Cathy.ID), "Cathy", "eng", "").MustBuild()
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestSessionFailedStart(t *testing.T) {
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshFlows)
require.NoError(t, err)

modelContact, _ := testdata.Cathy.Load(rt, oa)
modelContact, _, _ := testdata.Cathy.Load(rt, oa)

_, flowSession, sprint1 := test.NewSessionBuilder().WithAssets(oa.SessionAssets()).WithFlow(ping.UUID).
WithContact(testdata.Cathy.UUID, flows.ContactID(testdata.Cathy.ID), "Cathy", "eng", "").MustBuild()
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestClearWaitTimeout(t *testing.T) {

oa := testdata.Org1.Load(rt)

_, cathy := testdata.Cathy.Load(rt, oa)
_, cathy, _ := testdata.Cathy.Load(rt, oa)

expiresOn := time.Now().Add(time.Hour)
timeoutOn := time.Now().Add(time.Minute)
Expand Down
Loading

0 comments on commit 4d6ec09

Please sign in to comment.