Skip to content

Commit

Permalink
feat: add whatsapp broadcasts support
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobernardoaf committed Nov 19, 2024
1 parent a7c2492 commit 418db90
Show file tree
Hide file tree
Showing 4 changed files with 517 additions and 2 deletions.
2 changes: 1 addition & 1 deletion core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urn
owners[identityToOriginal[urn]] = id
}

if (orgConfig.Get("verify_ninth_digit", false) == true) {
if orgConfig.Get("verify_ninth_digit", false) == true {
owners, err = checkNinthDigitContacts(ctx, owners, db, orgID)
if err != nil {
return nil, errors.Wrapf(err, "error while checking for ninth digit contacts")
Expand Down
340 changes: 339 additions & 1 deletion core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ func GetMsgRepetitions(rp *redis.Pool, contactID ContactID, msg *flows.MsgOut) (
return redis.Int(msgRepetitionsScript.Do(rc, key, contactID, msg.Text()))
}

// GetWppMsgRepetitions gets the number of repetitions of this msg text for the given contact in the current 5 minute window
func GetWppMsgRepetitions(rp *redis.Pool, contactID ContactID, msg *flows.MsgWppOut) (int, error) {
rc := rp.Get()
defer rc.Close()

keyTime := dates.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf("msg_repetitions:%s", keyTime.Format("2006-01-02T15:04"))
return redis.Int(msgRepetitionsScript.Do(rc, key, contactID, msg.Text()))
}

// NewOutgoingFlowMsg creates an outgoing message for the passed in flow message
func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session *Session, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, session.ContactID(), out, createdOn, session, NilBroadcastID)
Expand All @@ -348,6 +358,10 @@ func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, co
return newOutgoingMsg(rt, org, channel, contactID, out, createdOn, nil, broadcastID)
}

func NewOutgoingWppBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgWppOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) {
return newOutgoingMsgWpp(rt, org, channel, contactID, out, createdOn, nil, broadcastID)
}

// NewOutgoingMsg creates an outgoing message that does not belong to any flow or broadcast, it's used to the a direct message to the contact
func NewOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgOut, createdOn time.Time) (*Msg, error) {
return newOutgoingMsg(rt, org, channel, contactID, out, createdOn, nil, NilBroadcastID)
Expand Down Expand Up @@ -552,6 +566,18 @@ func newOutgoingMsgWpp(rt *runtime.Runtime, org *Org, channel *Channel, contactI
// if msg is missing the URN or channel, we also fail it
m.Status = MsgStatusFailed
m.FailedReason = MsgFailedNoDestination
} else {
// also fail right away if this looks like a loop
repetitions, err := GetWppMsgRepetitions(rt.RP, contactID, msgWpp)
if err != nil {
return nil, errors.Wrap(err, "error looking up msg repetitions")
}
if repetitions >= 20 {
m.Status = MsgStatusFailed
m.FailedReason = MsgFailedLooping

logrus.WithFields(logrus.Fields{"contact_id": contactID, "text": msgWpp.Text(), "repetitions": repetitions}).Error("too many repetitions, failing message")
}
}

// if we have a session, set fields on the message from that
Expand All @@ -573,7 +599,7 @@ func newOutgoingMsgWpp(rt *runtime.Runtime, org *Org, channel *Channel, contactI
}
}

if len(msgWpp.QuickReplies()) > 0 || len(msgWpp.ListMessage().ListItems) > 0 || msgWpp.Topic() != flows.NilMsgTopic || msgWpp.Text() != "" || msgWpp.Footer() != "" || msgWpp.HeaderType() != "" || msgWpp.InteractionType() != "" {
if len(msgWpp.QuickReplies()) > 0 || len(msgWpp.ListMessage().ListItems) > 0 || msgWpp.Topic() != flows.NilMsgTopic || msgWpp.Text() != "" || msgWpp.Footer() != "" || msgWpp.HeaderType() != "" || msgWpp.InteractionType() != "" || msgWpp.Templating() != nil {
metadata := make(map[string]interface{})
if msgWpp.Topic() != flows.NilMsgTopic {
metadata["topic"] = string(msgWpp.Topic())
Expand Down Expand Up @@ -628,6 +654,10 @@ func newOutgoingMsgWpp(rt *runtime.Runtime, org *Org, channel *Channel, contactI
if msgWpp.TextLanguage != "" {
metadata["text_language"] = msgWpp.TextLanguage
}
if msgWpp.Templating() != nil {
metadata["templating"] = msgWpp.Templating()
m.Template = msgWpp.Templating().Template().Name
}

m.Metadata = null.NewMap(metadata)
}
Expand Down Expand Up @@ -1402,6 +1432,314 @@ func CreateBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAs
return msgs, nil
}

type WppBroadcastTemplate struct {
UUID assets.TemplateUUID `json:"uuid" validate:"required,uuid"`
Name string `json:"name" validate:"required"`
Variables []string `json:"variables,omitempty"`
}

type WppBroadcastMessageHeader struct {
Type string `json:"type,omitempty"`
Text string `json:"text,omitempty"`
}

type WppBroadcastMessage struct {
Text string `json:"text,omitempty"`
Header WppBroadcastMessageHeader `json:"header,omitempty"`
Footer string `json:"footer,omitempty"`
Attachments []utils.Attachment `json:"attachments,omitempty"`
QuickReplies []string `json:"quick_replies,omitempty"`
Template WppBroadcastTemplate `json:"template,omitempty"`
InteractionType string `json:"interaction_type,omitempty"`
OrderDetails flows.OrderDetailsMessage `json:"order_details,omitempty"`
FlowMessage flows.FlowMessage `json:"flow_message,omitempty"`
ListMessage flows.ListMessage `json:"list_message,omitempty"`
CTAMessage flows.CTAMessage `json:"cta_message,omitempty"`
}

type WppBroadcast struct {
b struct {
BroadcastID BroadcastID `json:"broadcast_id,omitempty" db:"id"`
URNs []urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
GroupIDs []GroupID `json:"group_ids,omitempty"`
OrgID OrgID `json:"org_id" db:"org_id"`
ParentID BroadcastID `json:"parent_id,omitempty" db:"parent_id"`
Msg WppBroadcastMessage `json:"msg"`
}
}

func (b *WppBroadcast) ID() BroadcastID { return b.b.BroadcastID }
func (b *WppBroadcast) OrgID() OrgID { return b.b.OrgID }
func (b *WppBroadcast) ContactIDs() []ContactID { return b.b.ContactIDs }
func (b *WppBroadcast) GroupIDs() []GroupID { return b.b.GroupIDs }
func (b *WppBroadcast) URNs() []urns.URN { return b.b.URNs }
func (b *WppBroadcast) Msg() WppBroadcastMessage { return b.b.Msg }

func (b *WppBroadcast) MarshalJSON() ([]byte, error) { return json.Marshal(b.b) }
func (b *WppBroadcast) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &b.b) }

func NewWppBroadcast(orgID OrgID, id BroadcastID, msg WppBroadcastMessage, urns []urns.URN, contactIDs []ContactID, groupIDs []GroupID) *WppBroadcast {
bcast := &WppBroadcast{}
bcast.b.OrgID = orgID
bcast.b.BroadcastID = id
bcast.b.Msg = msg
bcast.b.URNs = urns
bcast.b.ContactIDs = contactIDs
bcast.b.GroupIDs = groupIDs

return bcast
}

func (b *WppBroadcast) CreateBatch(contactIDs []ContactID) *WppBroadcastBatch {
batch := &WppBroadcastBatch{}
batch.b.BroadcastID = b.b.BroadcastID
batch.b.Msg = b.b.Msg
batch.b.OrgID = b.b.OrgID
batch.b.ContactIDs = contactIDs
return batch
}

type WppBroadcastBatch struct {
b struct {
BroadcastID BroadcastID `json:"broadcast_id,omitempty"`
Msg WppBroadcastMessage `json:"msg"`
URNs map[ContactID]urns.URN `json:"urns,omitempty"`
ContactIDs []ContactID `json:"contact_ids,omitempty"`
IsLast bool `json:"is_last"`
OrgID OrgID `json:"org_id"`
}
}

func (b *WppBroadcastBatch) BroadcastID() BroadcastID { return b.b.BroadcastID }
func (b *WppBroadcastBatch) ContactIDs() []ContactID { return b.b.ContactIDs }
func (b *WppBroadcastBatch) URNs() map[ContactID]urns.URN { return b.b.URNs }
func (b *WppBroadcastBatch) SetURNs(urns map[ContactID]urns.URN) { b.b.URNs = urns }
func (b *WppBroadcastBatch) OrgID() OrgID { return b.b.OrgID }
func (b *WppBroadcastBatch) Msg() WppBroadcastMessage { return b.b.Msg }

func (b *WppBroadcastBatch) IsLast() bool { return b.b.IsLast }
func (b *WppBroadcastBatch) SetIsLast(last bool) { b.b.IsLast = last }

func (b *WppBroadcastBatch) MarshalJSON() ([]byte, error) { return json.Marshal(b.b) }
func (b *WppBroadcastBatch) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &b.b) }

func CreateWppBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAssets, bcast *WppBroadcastBatch) ([]*Msg, error) {
repeatedContacts := make(map[ContactID]bool)
broadcastURNs := bcast.URNs()

// build our list of contact ids
contactIDs := bcast.ContactIDs()

// build a map of the contacts that are present both in our URN list and our contact id list
if broadcastURNs != nil {
for _, id := range contactIDs {
_, found := broadcastURNs[id]
if found {
repeatedContacts[id] = true
}
}

// if we have URN we need to send to, add those contacts as well if not already repeated
for id := range broadcastURNs {
if !repeatedContacts[id] {
contactIDs = append(contactIDs, id)
}
}
}

// load all our contacts
contacts, err := LoadContacts(ctx, rt.DB, oa, contactIDs)
if err != nil {
return nil, errors.Wrapf(err, "error loading contacts for broadcast")
}

channels := oa.SessionAssets().Channels()

// for each contact, build our message
msgs := make([]*Msg, 0, len(contacts))

// utility method to build up our message
buildMessage := func(c *Contact, forceURN urns.URN) (*Msg, error) {
if c.Status() != ContactStatusActive {
return nil, nil
}

contact, err := c.FlowContact(oa)
if err != nil {
return nil, errors.Wrapf(err, "error creating flow contact")
}

urn := urns.NilURN
var channel *Channel

// we are forcing to send to a non-preferred URN, find the channel
if forceURN != urns.NilURN {
for _, u := range contact.URNs() {
if u.URN().Identity() == forceURN.Identity() {
c := channels.GetForURN(u, assets.ChannelRoleSend)
if c == nil {
return nil, nil
}
urn = u.URN()
channel = oa.ChannelByUUID(c.UUID())
break
}
}
} else {
// no forced URN, find the first URN we can send to
for _, u := range contact.URNs() {
c := channels.GetForURN(u, assets.ChannelRoleSend)
if c != nil {
urn = u.URN()
channel = oa.ChannelByUUID(c.UUID())
break
}
}
}

// no urn and channel? move on
if channel == nil {
return nil, nil
}

// evaluate our message fields
text := bcast.Msg().Text
attachments := bcast.Msg().Attachments
quickReplies := bcast.Msg().QuickReplies
headerType := bcast.Msg().Header.Type
headerText := bcast.Msg().Header.Text
footerText := bcast.Msg().Footer
var templating *flows.MsgTemplating = nil
templateVariables := bcast.Msg().Template.Variables

ctaMessage := bcast.Msg().CTAMessage
listMessage := bcast.Msg().ListMessage
flowMessage := bcast.Msg().FlowMessage
orderDetails := bcast.Msg().OrderDetails

// build up the minimum viable context for evaluation
evaluationCtx := types.NewXObject(map[string]types.XValue{
"contact": flows.Context(oa.Env(), contact),
"fields": flows.Context(oa.Env(), contact.Fields()),
"globals": flows.Context(oa.Env(), oa.SessionAssets().Globals()),
"urns": flows.ContextFunc(oa.Env(), contact.URNs().MapContext),
})

// evaluate our text
text, _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, text, nil)

// evaluate our header text
headerText, _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, headerText, nil)

// evaluate our footer text
footerText, _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, footerText, nil)

// evaluate our quick replies
for i, qr := range quickReplies {
quickReplies[i], _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, qr, nil)
}

// evaluate our template
if bcast.Msg().Template.UUID != "" {
// load our template
var templateMatch assets.Template = nil
for _, t := range oa.templates {
if t.UUID() == bcast.Msg().Template.UUID {
templateMatch = t
break
}
}
if templateMatch == nil {
return nil, errors.Errorf("template not found: %s", bcast.Msg().Template.UUID)
}

// looks for a translation in these locales
locales := []envs.Locale{
contact.Locale(oa.Env()),
oa.Env().DefaultLocale(),
}
translation := oa.SessionAssets().Templates().FindTranslation(bcast.Msg().Template.UUID, channel.ChannelReference(), locales)
if translation != nil {
// evaluate our variables
evaluatedVariables := make([]string, len(templateVariables))
for i, variable := range templateVariables {
sub, err := excellent.EvaluateTemplate(oa.Env(), evaluationCtx, variable, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to evaluate template variable")
}
evaluatedVariables[i] = sub
}

text = translation.Substitute(evaluatedVariables)
var templateReference = assets.NewTemplateReference(bcast.Msg().Template.UUID, bcast.Msg().Template.Name)
templating = flows.NewMsgTemplating(templateReference, translation.Language(), translation.Country(), evaluatedVariables, translation.Namespace())
}
}

// don't do anything if we have no text or attachments
if text == "" && len(attachments) == 0 {
return nil, nil
}

// create our outgoing message
out := flows.NewMsgWppOut(urn, channel.ChannelReference(), bcast.Msg().InteractionType, headerType, headerText, text, footerText, ctaMessage, listMessage, flowMessage, orderDetails, attachments, quickReplies, templating, flows.NilMsgTopic)
msg, err := NewOutgoingWppBroadcastMsg(rt, oa.Org(), channel, c.ID(), out, time.Now(), bcast.BroadcastID())
if err != nil {
return nil, errors.Wrapf(err, "error creating outgoing message")
}

return msg, nil
}

// run through all our contacts to create our messages
for _, c := range contacts {
// use the preferred URN if present
urn := broadcastURNs[c.ID()]
msg, err := buildMessage(c, urn)
if err != nil {
return nil, errors.Wrapf(err, "error creating broadcast message")
}
if msg != nil {
msgs = append(msgs, msg)
}

// if this is a contact that will receive two messages, calculate that one as well
if repeatedContacts[c.ID()] {
m2, err := buildMessage(c, urns.NilURN)
if err != nil {
return nil, errors.Wrapf(err, "error creating broadcast message")
}

// add this message if it isn't a duplicate
if m2 != nil && m2.URN() != msg.URN() {
msgs = append(msgs, m2)
}
}
}

// allocate a topup for these message if org uses topups
topup, err := AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), len(msgs))
if err != nil {
return nil, errors.Wrapf(err, "error allocating topup for broadcast messages")
}

// if we have an active topup, assign it to our messages
if topup != NilTopupID {
for _, m := range msgs {
m.SetTopup(topup)
}
}

// insert them in a single request
err = InsertMessages(ctx, rt.DB, msgs)
if err != nil {
return nil, errors.Wrapf(err, "error inserting broadcast messages")
}

return msgs, nil
}

const updateMsgForResendingSQL = `
UPDATE
msgs_msg m
Expand Down
Loading

0 comments on commit 418db90

Please sign in to comment.