From 418db90c4eee1cbf76e2b2e08bd16292843bd9fa Mon Sep 17 00:00:00 2001 From: Paulo Bernardo Date: Tue, 19 Nov 2024 10:13:30 -0300 Subject: [PATCH] feat: add whatsapp broadcasts support --- core/models/contacts.go | 2 +- core/models/msgs.go | 340 +++++++++++++++++++++++++- core/queue/queue.go | 6 + core/tasks/msgs/send_wpp_broadcast.go | 171 +++++++++++++ 4 files changed, 517 insertions(+), 2 deletions(-) create mode 100644 core/tasks/msgs/send_wpp_broadcast.go diff --git a/core/models/contacts.go b/core/models/contacts.go index 0f574b988..8013d1b39 100644 --- a/core/models/contacts.go +++ b/core/models/contacts.go @@ -694,7 +694,7 @@ func contactIDsFromURNs(ctx context.Context, db Queryer, orgID OrgID, urnz []urn owners[identityToOriginal[urn]] = id } - if (orgConfig.Get("verify_ninth_digit", false) == true) { + if orgConfig.Get("verify_ninth_digit", false) == true { owners, err = checkNinthDigitContacts(ctx, owners, db, orgID) if err != nil { return nil, errors.Wrapf(err, "error while checking for ninth digit contacts") diff --git a/core/models/msgs.go b/core/models/msgs.go index 1b57f4160..26f824cd3 100644 --- a/core/models/msgs.go +++ b/core/models/msgs.go @@ -328,6 +328,16 @@ func GetMsgRepetitions(rp *redis.Pool, contactID ContactID, msg *flows.MsgOut) ( return redis.Int(msgRepetitionsScript.Do(rc, key, contactID, msg.Text())) } +// GetWppMsgRepetitions gets the number of repetitions of this msg text for the given contact in the current 5 minute window +func GetWppMsgRepetitions(rp *redis.Pool, contactID ContactID, msg *flows.MsgWppOut) (int, error) { + rc := rp.Get() + defer rc.Close() + + keyTime := dates.Now().UTC().Round(time.Minute * 5) + key := fmt.Sprintf("msg_repetitions:%s", keyTime.Format("2006-01-02T15:04")) + return redis.Int(msgRepetitionsScript.Do(rc, key, contactID, msg.Text())) +} + // NewOutgoingFlowMsg creates an outgoing message for the passed in flow message func NewOutgoingFlowMsg(rt *runtime.Runtime, org *Org, channel *Channel, session *Session, out *flows.MsgOut, createdOn time.Time) (*Msg, error) { return newOutgoingMsg(rt, org, channel, session.ContactID(), out, createdOn, session, NilBroadcastID) @@ -348,6 +358,10 @@ func NewOutgoingBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, co return newOutgoingMsg(rt, org, channel, contactID, out, createdOn, nil, broadcastID) } +func NewOutgoingWppBroadcastMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgWppOut, createdOn time.Time, broadcastID BroadcastID) (*Msg, error) { + return newOutgoingMsgWpp(rt, org, channel, contactID, out, createdOn, nil, broadcastID) +} + // NewOutgoingMsg creates an outgoing message that does not belong to any flow or broadcast, it's used to the a direct message to the contact func NewOutgoingMsg(rt *runtime.Runtime, org *Org, channel *Channel, contactID ContactID, out *flows.MsgOut, createdOn time.Time) (*Msg, error) { return newOutgoingMsg(rt, org, channel, contactID, out, createdOn, nil, NilBroadcastID) @@ -552,6 +566,18 @@ func newOutgoingMsgWpp(rt *runtime.Runtime, org *Org, channel *Channel, contactI // if msg is missing the URN or channel, we also fail it m.Status = MsgStatusFailed m.FailedReason = MsgFailedNoDestination + } else { + // also fail right away if this looks like a loop + repetitions, err := GetWppMsgRepetitions(rt.RP, contactID, msgWpp) + if err != nil { + return nil, errors.Wrap(err, "error looking up msg repetitions") + } + if repetitions >= 20 { + m.Status = MsgStatusFailed + m.FailedReason = MsgFailedLooping + + logrus.WithFields(logrus.Fields{"contact_id": contactID, "text": msgWpp.Text(), "repetitions": repetitions}).Error("too many repetitions, failing message") + } } // if we have a session, set fields on the message from that @@ -573,7 +599,7 @@ func newOutgoingMsgWpp(rt *runtime.Runtime, org *Org, channel *Channel, contactI } } - if len(msgWpp.QuickReplies()) > 0 || len(msgWpp.ListMessage().ListItems) > 0 || msgWpp.Topic() != flows.NilMsgTopic || msgWpp.Text() != "" || msgWpp.Footer() != "" || msgWpp.HeaderType() != "" || msgWpp.InteractionType() != "" { + if len(msgWpp.QuickReplies()) > 0 || len(msgWpp.ListMessage().ListItems) > 0 || msgWpp.Topic() != flows.NilMsgTopic || msgWpp.Text() != "" || msgWpp.Footer() != "" || msgWpp.HeaderType() != "" || msgWpp.InteractionType() != "" || msgWpp.Templating() != nil { metadata := make(map[string]interface{}) if msgWpp.Topic() != flows.NilMsgTopic { metadata["topic"] = string(msgWpp.Topic()) @@ -628,6 +654,10 @@ func newOutgoingMsgWpp(rt *runtime.Runtime, org *Org, channel *Channel, contactI if msgWpp.TextLanguage != "" { metadata["text_language"] = msgWpp.TextLanguage } + if msgWpp.Templating() != nil { + metadata["templating"] = msgWpp.Templating() + m.Template = msgWpp.Templating().Template().Name + } m.Metadata = null.NewMap(metadata) } @@ -1402,6 +1432,314 @@ func CreateBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAs return msgs, nil } +type WppBroadcastTemplate struct { + UUID assets.TemplateUUID `json:"uuid" validate:"required,uuid"` + Name string `json:"name" validate:"required"` + Variables []string `json:"variables,omitempty"` +} + +type WppBroadcastMessageHeader struct { + Type string `json:"type,omitempty"` + Text string `json:"text,omitempty"` +} + +type WppBroadcastMessage struct { + Text string `json:"text,omitempty"` + Header WppBroadcastMessageHeader `json:"header,omitempty"` + Footer string `json:"footer,omitempty"` + Attachments []utils.Attachment `json:"attachments,omitempty"` + QuickReplies []string `json:"quick_replies,omitempty"` + Template WppBroadcastTemplate `json:"template,omitempty"` + InteractionType string `json:"interaction_type,omitempty"` + OrderDetails flows.OrderDetailsMessage `json:"order_details,omitempty"` + FlowMessage flows.FlowMessage `json:"flow_message,omitempty"` + ListMessage flows.ListMessage `json:"list_message,omitempty"` + CTAMessage flows.CTAMessage `json:"cta_message,omitempty"` +} + +type WppBroadcast struct { + b struct { + BroadcastID BroadcastID `json:"broadcast_id,omitempty" db:"id"` + URNs []urns.URN `json:"urns,omitempty"` + ContactIDs []ContactID `json:"contact_ids,omitempty"` + GroupIDs []GroupID `json:"group_ids,omitempty"` + OrgID OrgID `json:"org_id" db:"org_id"` + ParentID BroadcastID `json:"parent_id,omitempty" db:"parent_id"` + Msg WppBroadcastMessage `json:"msg"` + } +} + +func (b *WppBroadcast) ID() BroadcastID { return b.b.BroadcastID } +func (b *WppBroadcast) OrgID() OrgID { return b.b.OrgID } +func (b *WppBroadcast) ContactIDs() []ContactID { return b.b.ContactIDs } +func (b *WppBroadcast) GroupIDs() []GroupID { return b.b.GroupIDs } +func (b *WppBroadcast) URNs() []urns.URN { return b.b.URNs } +func (b *WppBroadcast) Msg() WppBroadcastMessage { return b.b.Msg } + +func (b *WppBroadcast) MarshalJSON() ([]byte, error) { return json.Marshal(b.b) } +func (b *WppBroadcast) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &b.b) } + +func NewWppBroadcast(orgID OrgID, id BroadcastID, msg WppBroadcastMessage, urns []urns.URN, contactIDs []ContactID, groupIDs []GroupID) *WppBroadcast { + bcast := &WppBroadcast{} + bcast.b.OrgID = orgID + bcast.b.BroadcastID = id + bcast.b.Msg = msg + bcast.b.URNs = urns + bcast.b.ContactIDs = contactIDs + bcast.b.GroupIDs = groupIDs + + return bcast +} + +func (b *WppBroadcast) CreateBatch(contactIDs []ContactID) *WppBroadcastBatch { + batch := &WppBroadcastBatch{} + batch.b.BroadcastID = b.b.BroadcastID + batch.b.Msg = b.b.Msg + batch.b.OrgID = b.b.OrgID + batch.b.ContactIDs = contactIDs + return batch +} + +type WppBroadcastBatch struct { + b struct { + BroadcastID BroadcastID `json:"broadcast_id,omitempty"` + Msg WppBroadcastMessage `json:"msg"` + URNs map[ContactID]urns.URN `json:"urns,omitempty"` + ContactIDs []ContactID `json:"contact_ids,omitempty"` + IsLast bool `json:"is_last"` + OrgID OrgID `json:"org_id"` + } +} + +func (b *WppBroadcastBatch) BroadcastID() BroadcastID { return b.b.BroadcastID } +func (b *WppBroadcastBatch) ContactIDs() []ContactID { return b.b.ContactIDs } +func (b *WppBroadcastBatch) URNs() map[ContactID]urns.URN { return b.b.URNs } +func (b *WppBroadcastBatch) SetURNs(urns map[ContactID]urns.URN) { b.b.URNs = urns } +func (b *WppBroadcastBatch) OrgID() OrgID { return b.b.OrgID } +func (b *WppBroadcastBatch) Msg() WppBroadcastMessage { return b.b.Msg } + +func (b *WppBroadcastBatch) IsLast() bool { return b.b.IsLast } +func (b *WppBroadcastBatch) SetIsLast(last bool) { b.b.IsLast = last } + +func (b *WppBroadcastBatch) MarshalJSON() ([]byte, error) { return json.Marshal(b.b) } +func (b *WppBroadcastBatch) UnmarshalJSON(data []byte) error { return json.Unmarshal(data, &b.b) } + +func CreateWppBroadcastMessages(ctx context.Context, rt *runtime.Runtime, oa *OrgAssets, bcast *WppBroadcastBatch) ([]*Msg, error) { + repeatedContacts := make(map[ContactID]bool) + broadcastURNs := bcast.URNs() + + // build our list of contact ids + contactIDs := bcast.ContactIDs() + + // build a map of the contacts that are present both in our URN list and our contact id list + if broadcastURNs != nil { + for _, id := range contactIDs { + _, found := broadcastURNs[id] + if found { + repeatedContacts[id] = true + } + } + + // if we have URN we need to send to, add those contacts as well if not already repeated + for id := range broadcastURNs { + if !repeatedContacts[id] { + contactIDs = append(contactIDs, id) + } + } + } + + // load all our contacts + contacts, err := LoadContacts(ctx, rt.DB, oa, contactIDs) + if err != nil { + return nil, errors.Wrapf(err, "error loading contacts for broadcast") + } + + channels := oa.SessionAssets().Channels() + + // for each contact, build our message + msgs := make([]*Msg, 0, len(contacts)) + + // utility method to build up our message + buildMessage := func(c *Contact, forceURN urns.URN) (*Msg, error) { + if c.Status() != ContactStatusActive { + return nil, nil + } + + contact, err := c.FlowContact(oa) + if err != nil { + return nil, errors.Wrapf(err, "error creating flow contact") + } + + urn := urns.NilURN + var channel *Channel + + // we are forcing to send to a non-preferred URN, find the channel + if forceURN != urns.NilURN { + for _, u := range contact.URNs() { + if u.URN().Identity() == forceURN.Identity() { + c := channels.GetForURN(u, assets.ChannelRoleSend) + if c == nil { + return nil, nil + } + urn = u.URN() + channel = oa.ChannelByUUID(c.UUID()) + break + } + } + } else { + // no forced URN, find the first URN we can send to + for _, u := range contact.URNs() { + c := channels.GetForURN(u, assets.ChannelRoleSend) + if c != nil { + urn = u.URN() + channel = oa.ChannelByUUID(c.UUID()) + break + } + } + } + + // no urn and channel? move on + if channel == nil { + return nil, nil + } + + // evaluate our message fields + text := bcast.Msg().Text + attachments := bcast.Msg().Attachments + quickReplies := bcast.Msg().QuickReplies + headerType := bcast.Msg().Header.Type + headerText := bcast.Msg().Header.Text + footerText := bcast.Msg().Footer + var templating *flows.MsgTemplating = nil + templateVariables := bcast.Msg().Template.Variables + + ctaMessage := bcast.Msg().CTAMessage + listMessage := bcast.Msg().ListMessage + flowMessage := bcast.Msg().FlowMessage + orderDetails := bcast.Msg().OrderDetails + + // build up the minimum viable context for evaluation + evaluationCtx := types.NewXObject(map[string]types.XValue{ + "contact": flows.Context(oa.Env(), contact), + "fields": flows.Context(oa.Env(), contact.Fields()), + "globals": flows.Context(oa.Env(), oa.SessionAssets().Globals()), + "urns": flows.ContextFunc(oa.Env(), contact.URNs().MapContext), + }) + + // evaluate our text + text, _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, text, nil) + + // evaluate our header text + headerText, _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, headerText, nil) + + // evaluate our footer text + footerText, _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, footerText, nil) + + // evaluate our quick replies + for i, qr := range quickReplies { + quickReplies[i], _ = excellent.EvaluateTemplate(oa.Env(), evaluationCtx, qr, nil) + } + + // evaluate our template + if bcast.Msg().Template.UUID != "" { + // load our template + var templateMatch assets.Template = nil + for _, t := range oa.templates { + if t.UUID() == bcast.Msg().Template.UUID { + templateMatch = t + break + } + } + if templateMatch == nil { + return nil, errors.Errorf("template not found: %s", bcast.Msg().Template.UUID) + } + + // looks for a translation in these locales + locales := []envs.Locale{ + contact.Locale(oa.Env()), + oa.Env().DefaultLocale(), + } + translation := oa.SessionAssets().Templates().FindTranslation(bcast.Msg().Template.UUID, channel.ChannelReference(), locales) + if translation != nil { + // evaluate our variables + evaluatedVariables := make([]string, len(templateVariables)) + for i, variable := range templateVariables { + sub, err := excellent.EvaluateTemplate(oa.Env(), evaluationCtx, variable, nil) + if err != nil { + return nil, errors.Wrapf(err, "failed to evaluate template variable") + } + evaluatedVariables[i] = sub + } + + text = translation.Substitute(evaluatedVariables) + var templateReference = assets.NewTemplateReference(bcast.Msg().Template.UUID, bcast.Msg().Template.Name) + templating = flows.NewMsgTemplating(templateReference, translation.Language(), translation.Country(), evaluatedVariables, translation.Namespace()) + } + } + + // don't do anything if we have no text or attachments + if text == "" && len(attachments) == 0 { + return nil, nil + } + + // create our outgoing message + out := flows.NewMsgWppOut(urn, channel.ChannelReference(), bcast.Msg().InteractionType, headerType, headerText, text, footerText, ctaMessage, listMessage, flowMessage, orderDetails, attachments, quickReplies, templating, flows.NilMsgTopic) + msg, err := NewOutgoingWppBroadcastMsg(rt, oa.Org(), channel, c.ID(), out, time.Now(), bcast.BroadcastID()) + if err != nil { + return nil, errors.Wrapf(err, "error creating outgoing message") + } + + return msg, nil + } + + // run through all our contacts to create our messages + for _, c := range contacts { + // use the preferred URN if present + urn := broadcastURNs[c.ID()] + msg, err := buildMessage(c, urn) + if err != nil { + return nil, errors.Wrapf(err, "error creating broadcast message") + } + if msg != nil { + msgs = append(msgs, msg) + } + + // if this is a contact that will receive two messages, calculate that one as well + if repeatedContacts[c.ID()] { + m2, err := buildMessage(c, urns.NilURN) + if err != nil { + return nil, errors.Wrapf(err, "error creating broadcast message") + } + + // add this message if it isn't a duplicate + if m2 != nil && m2.URN() != msg.URN() { + msgs = append(msgs, m2) + } + } + } + + // allocate a topup for these message if org uses topups + topup, err := AllocateTopups(ctx, rt.DB, rt.RP, oa.Org(), len(msgs)) + if err != nil { + return nil, errors.Wrapf(err, "error allocating topup for broadcast messages") + } + + // if we have an active topup, assign it to our messages + if topup != NilTopupID { + for _, m := range msgs { + m.SetTopup(topup) + } + } + + // insert them in a single request + err = InsertMessages(ctx, rt.DB, msgs) + if err != nil { + return nil, errors.Wrapf(err, "error inserting broadcast messages") + } + + return msgs, nil +} + const updateMsgForResendingSQL = ` UPDATE msgs_msg m diff --git a/core/queue/queue.go b/core/queue/queue.go index 0cc5e1b86..b38a6b90e 100644 --- a/core/queue/queue.go +++ b/core/queue/queue.go @@ -50,6 +50,12 @@ const ( // SendBroadcastBatch is our type for sending a broadcast batch SendBroadcastBatch = "send_broadcast_batch" + // SendTemplateBroadcast is our type for sending a template broadcast + SendWppBroadcast = "send_whatsapp_broadcast" + + // SendTemplateBroadcastBatch is our type for sending a template broadcast batch + SendWppBroadcastBatch = "send_whatsapp_broadcast_batch" + // HandleContactEvent is our task for event handling HandleContactEvent = "handle_contact_event" diff --git a/core/tasks/msgs/send_wpp_broadcast.go b/core/tasks/msgs/send_wpp_broadcast.go new file mode 100644 index 000000000..2ac362d54 --- /dev/null +++ b/core/tasks/msgs/send_wpp_broadcast.go @@ -0,0 +1,171 @@ +package msgs + +import ( + "context" + "encoding/json" + "time" + + "github.com/nyaruka/gocommon/urns" + "github.com/nyaruka/mailroom" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/core/msgio" + "github.com/nyaruka/mailroom/core/queue" + "github.com/nyaruka/mailroom/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func init() { + mailroom.AddTaskFunction(queue.SendWppBroadcast, handleSendWppBroadcast) + mailroom.AddTaskFunction(queue.SendWppBroadcastBatch, handleSendWppBroadcastBatch) +} + +func handleSendWppBroadcast(ctx context.Context, rt *runtime.Runtime, task *queue.Task) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute*60) + defer cancel() + + // decode our task body + if task.Type != queue.SendWppBroadcast { + return errors.Errorf("unknown event type passed to send worker: %s", task.Type) + } + broadcast := &models.WppBroadcast{} + err := json.Unmarshal(task.Task, broadcast) + if err != nil { + return errors.Wrapf(err, "error unmarshalling broadcast: %s", string(task.Task)) + } + + return CreateWppBroadcastBatches(ctx, rt, broadcast) +} + +func CreateWppBroadcastBatches(ctx context.Context, rt *runtime.Runtime, bcast *models.WppBroadcast) error { + // we are building a set of contact ids, start with the explicit ones + contactIDs := make(map[models.ContactID]bool) + for _, id := range bcast.ContactIDs() { + contactIDs[id] = true + } + + groupContactIDs, err := models.ContactIDsForGroupIDs(ctx, rt.DB, bcast.GroupIDs()) + if err != nil { + return errors.Wrapf(err, "error getting contact ids for group ids") + } + + for _, id := range groupContactIDs { + contactIDs[id] = true + } + + oa, err := models.GetOrgAssets(ctx, rt, bcast.OrgID()) + if err != nil { + return errors.Wrapf(err, "error getting org assets") + } + + // get the contact ids for our URNs + urnMap, err := models.GetOrCreateContactIDsFromURNs(ctx, rt.DB, oa, bcast.URNs()) + if err != nil { + return errors.Wrapf(err, "error getting contact ids for urns") + } + + urnContacts := make(map[models.ContactID]urns.URN) + repeatedContacts := make(map[models.ContactID]urns.URN) + + q := queue.BatchQueue + + // two or fewer contacts? queue to our handler queue for sending + if len(contactIDs) <= 2 { + q = queue.HandlerQueue + } + + // we want to remove contacts that are also present in URN sends, these will be a special case in our last batch + for u, id := range urnMap { + if contactIDs[id] { + repeatedContacts[id] = u + delete(contactIDs, id) + } + urnContacts[id] = u + } + + rc := rt.RP.Get() + defer rc.Close() + + contacts := make([]models.ContactID, 0, 100) + + // utility functions for queueing the current set of contacts + queueBatch := func(isLast bool) { + // if this is our last batch include those contacts that overlap with our urns + if isLast { + for id := range repeatedContacts { + contacts = append(contacts, id) + } + } + + batch := bcast.CreateBatch(contacts) + + // also set our URNs + if isLast { + batch.SetIsLast(true) + batch.SetURNs(urnContacts) + } + + err = queue.AddTask(rc, q, queue.SendWppBroadcastBatch, int(bcast.OrgID()), batch, queue.DefaultPriority) + if err != nil { + logrus.WithError(err).Error("error while queuing wpp broadcast batch") + } + contacts = make([]models.ContactID, 0, 100) + } + + // build up batches of contacts to start + for c := range contactIDs { + if len(contacts) == startBatchSize { + queueBatch(false) + } + contacts = append(contacts, c) + } + + // queue our last batch + queueBatch(true) + + return nil +} + +func handleSendWppBroadcastBatch(ctx context.Context, rt *runtime.Runtime, task *queue.Task) error { + ctx, cancel := context.WithTimeout(ctx, time.Minute*60) + defer cancel() + + // decode our task body + if task.Type != queue.SendWppBroadcastBatch { + return errors.Errorf("unknown event type passed to send worker: %s", task.Type) + } + broadcast := &models.WppBroadcastBatch{} + err := json.Unmarshal(task.Task, broadcast) + if err != nil { + return errors.Wrapf(err, "error unmarshalling broadcast: %s", string(task.Task)) + } + + // try to send the batch + return SendWppBroadcastBatch(ctx, rt, broadcast) +} + +func SendWppBroadcastBatch(ctx context.Context, rt *runtime.Runtime, bcast *models.WppBroadcastBatch) error { + // always set our broadcast as sent if it is our last + defer func() { + if bcast.IsLast() { + err := models.MarkBroadcastSent(ctx, rt.DB, bcast.BroadcastID()) + if err != nil { + logrus.WithError(err).Error("error marking broadcast as sent") + } + } + }() + + oa, err := models.GetOrgAssets(ctx, rt, bcast.OrgID()) + if err != nil { + return errors.Wrapf(err, "error getting org assets") + } + + // create this batch of messages + msgs, err := models.CreateWppBroadcastMessages(ctx, rt, oa, bcast) + if err != nil { + return errors.Wrapf(err, "error creating broadcast messages") + } + + msgio.SendMessages(ctx, rt, rt.DB, nil, msgs) + return nil +}