Skip to content

Commit

Permalink
Merge pull request #56 from nyaruka/msg_origin
Browse files Browse the repository at this point in the history
Add origin field to messages queued to courier
  • Loading branch information
rowanseymour authored Apr 17, 2023
2 parents 25dbd8e + 43179d4 commit 2eda3bc
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 11 deletions.
28 changes: 20 additions & 8 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Msg struct {
// origin
BroadcastID BroadcastID `db:"broadcast_id"`
FlowID FlowID `db:"flow_id"`
TicketID TicketID `db:"ticket_id"`
CreatedByID UserID `db:"created_by_id"`

// content
Expand Down Expand Up @@ -165,6 +166,7 @@ func (m *Msg) URN() urns.URN { return m.m.URN }
func (m *Msg) URNAuth() null.String { return m.m.URNAuth }
func (m *Msg) OrgID() OrgID { return m.m.OrgID }
func (m *Msg) FlowID() FlowID { return m.m.FlowID }
func (m *Msg) TicketID() TicketID { return m.m.TicketID }
func (m *Msg) ContactID() ContactID { return m.m.ContactID }
func (m *Msg) ContactURNID() *URNID { return m.m.ContactURNID }

Expand Down Expand Up @@ -270,26 +272,32 @@ func NewOutgoingIVR(cfg *runtime.Config, orgID OrgID, call *Call, out *flows.Msg

// NewOutgoingFlowMsg creates an outgoing message for the passed in flow message
func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session *Session, flow *Flow, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingTextMsg(rt, org, channel, session.Contact(), out, createdOn, session, flow, NilBroadcastID, NilUserID)
return newOutgoingTextMsg(rt, org, channel, session.Contact(), out, createdOn, session, flow, NilBroadcastID, NilTicketID, NilUserID)
}

// NewOutgoingBroadcastMsg creates an outgoing message which is part of a broadcast
func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, bb *BroadcastBatch) (*Msg, error) {
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, bb.BroadcastID, bb.CreatedByID)
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, bb.BroadcastID, NilTicketID, bb.CreatedByID)
}

// NewOutgoingTicketMsg creates an outgoing message from a ticket
func NewOutgoingTicketMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, ticketID TicketID, userID UserID) (*Msg, error) {
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, NilBroadcastID, ticketID, userID)
}

// NewOutgoingChatMsg creates an outgoing message from chat
func NewOutgoingChatMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, userID UserID) (*Msg, error) {
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, NilBroadcastID, userID)
return newOutgoingTextMsg(rt, org, channel, contact, out, createdOn, nil, nil, NilBroadcastID, NilTicketID, userID)
}

func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID, userID UserID) (*Msg, error) {
func newOutgoingTextMsg(rt *runtime.Runtime, org *Org, channel *Channel, contact *flows.Contact, out *flows.MsgOut, createdOn time.Time, session *Session, flow *Flow, broadcastID BroadcastID, ticketID TicketID, userID UserID) (*Msg, error) {
msg := &Msg{}
m := &msg.m
m.UUID = out.UUID()
m.OrgID = org.ID()
m.ContactID = ContactID(contact.ID())
m.BroadcastID = broadcastID
m.TicketID = ticketID
m.Text = out.Text()
m.QuickReplies = out.QuickReplies()
m.Locale = out.Locale()
Expand Down Expand Up @@ -427,8 +435,10 @@ func GetMsgRepetitions(rp *redis.Pool, contact *flows.Contact, msg *flows.MsgOut
var loadMessagesSQL = `
SELECT
id,
uuid,
broadcast_id,
uuid,
flow_id,
ticket_id,
text,
attachments,
quick_replies,
Expand Down Expand Up @@ -465,8 +475,10 @@ func GetMessagesByID(ctx context.Context, db Queryer, orgID OrgID, direction Msg
var loadMessagesForRetrySQL = `
SELECT
m.id,
m.broadcast_id,
m.uuid,
m.broadcast_id,
m.flow_id,
m.ticket_id,
m.text,
m.attachments,
m.quick_replies,
Expand Down Expand Up @@ -566,10 +578,10 @@ const sqlInsertMsgSQL = `
INSERT INTO
msgs_msg(uuid, text, attachments, quick_replies, locale, high_priority, created_on, modified_on, queued_on, sent_on, direction, status, metadata,
visibility, msg_type, msg_count, error_count, next_attempt, failed_reason, channel_id,
contact_id, contact_urn_id, org_id, flow_id, broadcast_id, created_by_id)
contact_id, contact_urn_id, org_id, flow_id, broadcast_id, ticket_id, created_by_id)
VALUES(:uuid, :text, :attachments, :quick_replies, :locale, :high_priority, :created_on, now(), now(), :sent_on, :direction, :status, :metadata,
:visibility, :msg_type, :msg_count, :error_count, :next_attempt, :failed_reason, :channel_id,
:contact_id, :contact_urn_id, :org_id, :flow_id, :broadcast_id, :created_by_id)
:contact_id, :contact_urn_id, :org_id, :flow_id, :broadcast_id, :ticket_id, :created_by_id)
RETURNING
id AS id,
modified_on AS modified_on,
Expand Down
19 changes: 19 additions & 0 deletions core/msgio/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@ const (
highPriority = 1
)

type MsgOrigin string

const (
MsgOriginFlow MsgOrigin = "flow"
MsgOriginBroadcast MsgOrigin = "broadcast"
MsgOriginTicket MsgOrigin = "ticket"
MsgOriginChat MsgOrigin = "chat"
)

// Msg is the format of a message queued to courier
type Msg struct {
ID flows.MsgID `json:"id"`
UUID flows.MsgUUID `json:"uuid"`
OrgID models.OrgID `json:"org_id"`
Origin MsgOrigin `json:"origin"`
Text string `json:"text"`
Attachments []utils.Attachment `json:"attachments,omitempty"`
QuickReplies []string `json:"quick_replies,omitempty"`
Expand All @@ -63,19 +73,28 @@ type Msg struct {

// NewCourierMsg creates a courier message in the format it's expecting to be queued
func NewCourierMsg(oa *models.OrgAssets, m *models.Msg, channel *models.Channel) (*Msg, error) {
origin := MsgOriginChat
var flowRef *assets.FlowReference

if m.FlowID() != models.NilFlowID {
origin = MsgOriginFlow

flow, err := oa.FlowByID(m.FlowID())
if err != nil {
return nil, errors.Wrap(err, "error loading message flow")
}
flowRef = flow.Reference()
} else if m.BroadcastID() != models.NilBroadcastID {
origin = MsgOriginBroadcast
} else if m.TicketID() != models.NilTicketID {
origin = MsgOriginTicket
}

return &Msg{
ID: m.ID(),
UUID: m.UUID(),
OrgID: m.OrgID(),
Origin: origin,
Text: m.Text(),
Attachments: m.Attachments(),
QuickReplies: m.QuickReplies(),
Expand Down
4 changes: 3 additions & 1 deletion core/msgio/courier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/nyaruka/null/v2"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -91,6 +90,7 @@ func TestNewCourierMsg(t *testing.T) {
"topic": "purchase"
},
"org_id": 1,
"origin": "flow",
"quick_replies": [
"yes",
"no"
Expand Down Expand Up @@ -136,6 +136,7 @@ func TestNewCourierMsg(t *testing.T) {
"high_priority": true,
"id": %d,
"org_id": 1,
"origin": "flow",
"session_id": %d,
"session_status": "W",
"text": "Hi there",
Expand Down Expand Up @@ -166,6 +167,7 @@ func TestNewCourierMsg(t *testing.T) {
"high_priority": false,
"id": %d,
"org_id": 1,
"origin": "broadcast",
"text": "Blast",
"tps_cost": 1,
"urn": "tel:+250700000001?id=10000",
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.
2 changes: 1 addition & 1 deletion services/tickets/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func SendReply(ctx context.Context, rt *runtime.Runtime, ticket *models.Ticket,
}

out, ch := models.NewMsgOut(oa, contact, text, attachments, nil, contact.Locale(oa.Env()))
msg, err := models.NewOutgoingChatMsg(rt, oa.Org(), ch, contact, out, dates.Now(), models.NilUserID)
msg, err := models.NewOutgoingTicketMsg(rt, oa.Org(), ch, contact, out, dates.Now(), ticket.ID(), models.NilUserID)
if err != nil {
return nil, errors.Wrap(err, "error creating outgoing message")
}
Expand Down
9 changes: 8 additions & 1 deletion web/msg/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,14 @@ func handleSend(ctx context.Context, rt *runtime.Runtime, r *sendRequest) (any,
}

out, ch := models.NewMsgOut(oa, contact, r.Text, r.Attachments, nil, contact.Locale(oa.Env()))
msg, err := models.NewOutgoingChatMsg(rt, oa.Org(), ch, contact, out, dates.Now(), r.UserID)
var msg *models.Msg

if r.TicketID != models.NilTicketID {
msg, err = models.NewOutgoingTicketMsg(rt, oa.Org(), ch, contact, out, dates.Now(), r.TicketID, r.UserID)
} else {
msg, err = models.NewOutgoingChatMsg(rt, oa.Org(), ch, contact, out, dates.Now(), r.UserID)
}

if err != nil {
return nil, 0, errors.Wrap(err, "error creating outgoing message")
}
Expand Down

0 comments on commit 2eda3bc

Please sign in to comment.