Skip to content

Commit

Permalink
Merge pull request #40 from nyaruka/msg_type_for_outgoing
Browse files Browse the repository at this point in the history
Use `msg_type = T|V` for outgoing messages
  • Loading branch information
rowanseymour authored Feb 20, 2023
2 parents 9a4eced + 8871f35 commit e6b079c
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 51 deletions.
2 changes: 1 addition & 1 deletion core/handlers/msg_received.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func handleMsgReceived(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa
"urn": event.Msg.URN(),
}).Debug("msg received event")

msg := models.NewIncomingMsg(rt.Config, oa.OrgID(), nil, scene.ContactID(), &event.Msg, event.CreatedOn())
msg := models.NewIncomingSurveyorMsg(rt.Config, oa.OrgID(), nil, scene.ContactID(), &event.Msg, event.CreatedOn())

// we'll commit this message with all the others
scene.AppendToEventPreCommitHook(hooks.CommitMessagesHook, msg)
Expand Down
100 changes: 51 additions & 49 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ const (
type MsgType string

const (
// need to rework RP side before we can stop using these for incoming messages
MsgTypeInbox = MsgType("I")
MsgTypeFlow = MsgType("F")
MsgTypeIVR = MsgType("V")
MsgTypeUSSD = MsgType("U")

MsgTypeText = MsgType("T")
MsgTypeVoice = MsgType("V")
)

type MsgStatus string
Expand Down Expand Up @@ -150,7 +152,7 @@ func (m *Msg) QueuedOn() time.Time { return m.m.QueuedOn }
func (m *Msg) Direction() MsgDirection { return m.m.Direction }
func (m *Msg) Status() MsgStatus { return m.m.Status }
func (m *Msg) Visibility() MsgVisibility { return m.m.Visibility }
func (m *Msg) MsgType() MsgType { return m.m.MsgType }
func (m *Msg) Type() MsgType { return m.m.MsgType }
func (m *Msg) ErrorCount() int { return m.m.ErrorCount }
func (m *Msg) NextAttempt() *time.Time { return m.m.NextAttempt }
func (m *Msg) FailedReason() MsgFailedReason { return m.m.FailedReason }
Expand Down Expand Up @@ -222,7 +224,7 @@ func NewIncomingIVR(cfg *runtime.Config, orgID OrgID, call *Call, in *flows.MsgI
m.Direction = DirectionIn
m.Status = MsgStatusHandled
m.Visibility = VisibilityVisible
m.MsgType = MsgTypeIVR
m.MsgType = MsgTypeVoice
m.ContactID = call.ContactID()

urnID := call.ContactURNID()
Expand Down Expand Up @@ -253,7 +255,7 @@ func NewOutgoingIVR(cfg *runtime.Config, orgID OrgID, call *Call, out *flows.Msg
m.Direction = DirectionOut
m.Status = MsgStatusWired
m.Visibility = VisibilityVisible
m.MsgType = MsgTypeIVR
m.MsgType = MsgTypeVoice
m.ContactID = call.ContactID()

urnID := call.ContactURNID()
Expand All @@ -274,59 +276,22 @@ func NewOutgoingIVR(cfg *runtime.Config, orgID OrgID, call *Call, out *flows.Msg
return msg
}

var msgRepetitionsScript = redis.NewScript(3, `
local key, contact_id, text = KEYS[1], KEYS[2], KEYS[3]
local count = 1
-- try to look up in window
local record = redis.call("HGET", key, contact_id)
if record then
local record_count = tonumber(string.sub(record, 1, 2))
local record_text = string.sub(record, 4, -1)
if record_text == text then
count = math.min(record_count + 1, 99)
else
count = 1
end
end
-- create our new record with our updated count
record = string.format("%02d:%s", count, text)
-- write our new record with updated count and set expiration
redis.call("HSET", key, contact_id, record)
redis.call("EXPIRE", key, 300)
return count
`)

// GetMsgRepetitions gets the number of repetitions of this msg text for the given contact in the current 5 minute window
func GetMsgRepetitions(rp *redis.Pool, contact *flows.Contact, msg *flows.MsgOut) (int, error) {
rc := rp.Get()
defer rc.Close()

keyTime := dates.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf("msg_repetitions:%s", keyTime.Format("2006-01-02T15:04"))
return redis.Int(msgRepetitionsScript.Do(rc, key, contact.ID(), msg.Text()))
}

// NewOutgoingFlowMsg creates an outgoing message for the passed in flow message
func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session *Session, flow *Flow, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, session.Contact(), out, createdOn, session, flow, NilBroadcastID)
return newOutgoingTextMsg(rt, org, channel, session.Contact(), out, createdOn, session, flow, NilBroadcastID)
}

// NewOutgoingBroadcastMsg creates an outgoing message which is part of a broadcast
func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, contact, out, createdOn, nil, nil, broadcastID)
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, broadcastID)
}

// NewOutgoingChatMsg creates an outgoing message from chat
func NewOutgoingChatMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, contact, out, createdOn, nil, nil, NilBroadcastID)
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, NilBroadcastID)
}

func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) {
func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID) (*Msg, error) {
msg := &Msg{}
m := &msg.m
m.UUID = out.UUID()
Expand All @@ -340,7 +305,7 @@ func newOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *fl
m.Direction = DirectionOut
m.Status = MsgStatusQueued
m.Visibility = VisibilityVisible
m.MsgType = MsgTypeFlow
m.MsgType = MsgTypeText
m.MsgCount = 1
m.CreatedOn = createdOn
m.Metadata = null.Map(buildMsgMetadata(out))
Expand Down Expand Up @@ -412,8 +377,8 @@ func buildMsgMetadata(m *flows.MsgOut) map[string]interface{} {
return metadata
}

// NewIncomingMsg creates a new incoming message for the passed in text and attachment
func NewIncomingMsg(cfg *runtime.Config, orgID OrgID, channel *Channel, contactID ContactID, in *flows.MsgIn, createdOn time.Time) *Msg {
// NewIncomingSurveyorMsg creates a new incoming message for the passed in text and attachment
func NewIncomingSurveyorMsg(cfg *runtime.Config, orgID OrgID, channel *Channel, contactID ContactID, in *flows.MsgIn, createdOn time.Time) *Msg {
msg := &Msg{}

msg.SetChannel(channel)
Expand All @@ -438,6 +403,43 @@ func NewIncomingMsg(cfg *runtime.Config, orgID OrgID, channel *Channel, contactI
return msg
}

var msgRepetitionsScript = redis.NewScript(3, `
local key, contact_id, text = KEYS[1], KEYS[2], KEYS[3]
local count = 1
-- try to look up in window
local record = redis.call("HGET", key, contact_id)
if record then
local record_count = tonumber(string.sub(record, 1, 2))
local record_text = string.sub(record, 4, -1)
if record_text == text then
count = math.min(record_count + 1, 99)
else
count = 1
end
end
-- create our new record with our updated count
record = string.format("%02d:%s", count, text)
-- write our new record with updated count and set expiration
redis.call("HSET", key, contact_id, record)
redis.call("EXPIRE", key, 300)
return count
`)

// GetMsgRepetitions gets the number of repetitions of this msg text for the given contact in the current 5 minute window
func GetMsgRepetitions(rp *redis.Pool, contact *flows.Contact, msg *flows.MsgOut) (int, error) {
rc := rp.Get()
defer rc.Close()

keyTime := dates.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf("msg_repetitions:%s", keyTime.Format("2006-01-02T15:04"))
return redis.Int(msgRepetitionsScript.Do(rc, key, contact.ID(), msg.Text()))
}

var loadMessagesSQL = `
SELECT
id,
Expand Down
2 changes: 2 additions & 0 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func TestNewOutgoingFlowMsg(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, oa.OrgID(), msg.OrgID())
assert.Equal(t, tc.Text, msg.Text())
assert.Equal(t, models.MsgTypeText, msg.Type())
assert.Equal(t, expectedAttachments, msg.Attachments())
assert.Equal(t, tc.QuickReplies, msg.QuickReplies())
assert.Equal(t, tc.Contact.ID, msg.ContactID())
Expand Down Expand Up @@ -649,6 +650,7 @@ func TestNewOutgoingIVR(t *testing.T) {
dbMsg := models.NewOutgoingIVR(rt.Config, testdata.Org1.ID, conn, flowMsg, createdOn)

assert.Equal(t, flowMsg.UUID(), dbMsg.UUID())
assert.Equal(t, models.MsgTypeVoice, dbMsg.Type())
assert.Equal(t, "Hello", dbMsg.Text())
assert.Equal(t, []utils.Attachment{"audio:http://example.com/hi.mp3"}, dbMsg.Attachments())
assert.Equal(t, envs.Locale("eng-US"), dbMsg.Locale())
Expand Down
2 changes: 1 addition & 1 deletion core/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestBatchStart(t *testing.T) {

assertdb.Query(t, rt.DB,
`SELECT count(*) FROM msgs_msg WHERE contact_id = ANY($1) AND text = $2 AND org_id = 1 AND status = 'Q'
AND queued_on IS NOT NULL AND direction = 'O' AND msg_type = 'F' AND channel_id = $3`,
AND queued_on IS NOT NULL AND direction = 'O' AND msg_type = 'T' AND channel_id = $3`,
pq.Array(contactIDs), tc.Msg, testdata.TwilioChannel.ID).
Returns(tc.TotalCount, "%d: unexpected number of messages", i)

Expand Down

0 comments on commit e6b079c

Please sign in to comment.