From 34e10e51d2b0d28a9af46aa9fbebc09f5f367aa6 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 14 Jan 2021 10:59:23 -0500 Subject: [PATCH 1/2] Refactor of FCM client usage so it's a property of ther mailroom instance and not a global --- core/handlers/base_test.go | 6 ++-- core/hooks/commit_added_labels.go | 3 +- core/hooks/commit_field_changes.go | 8 ++++-- core/hooks/commit_group_changes.go | 3 +- core/hooks/commit_ivr.go | 3 +- core/hooks/commit_language_changes.go | 8 ++++-- core/hooks/commit_messages.go | 3 +- core/hooks/commit_name_changes.go | 8 ++++-- core/hooks/commit_status_changes.go | 3 +- core/hooks/commit_urn_changes.go | 3 +- core/hooks/contact_last_seen.go | 3 +- core/hooks/contact_modified.go | 3 +- core/hooks/insert_airtime_transfers.go | 3 +- core/hooks/insert_http_logs.go | 3 +- core/hooks/insert_start.go | 3 +- core/hooks/insert_tickets.go | 3 +- core/hooks/insert_webhook_event.go | 3 +- core/hooks/insert_webhook_result.go | 3 +- core/hooks/send_messages.go | 5 ++-- core/hooks/start_broadcasts.go | 3 +- core/hooks/start_start.go | 3 +- core/hooks/unsubscribe_resthook.go | 3 +- core/hooks/update_campaign_events.go | 8 ++++-- core/ivr/ivr.go | 9 +++--- core/models/events.go | 21 +++++++------- core/models/imports.go | 2 +- core/models/runs.go | 9 +++--- core/msgio/android.go | 32 ++------------------- core/msgio/android_test.go | 4 +-- core/msgio/send.go | 5 ++-- core/msgio/send_test.go | 4 +-- core/runner/runner.go | 29 ++++++++++--------- core/runner/runner_test.go | 8 +++--- core/tasks/broadcasts/worker.go | 7 +++-- core/tasks/broadcasts/worker_test.go | 4 +-- core/tasks/campaigns/fire_campaign_event.go | 2 +- core/tasks/handler/cron_test.go | 2 +- core/tasks/handler/handler_test.go | 14 ++++----- core/tasks/handler/worker.go | 31 ++++++++++---------- core/tasks/starts/worker.go | 2 +- core/tasks/starts/worker_test.go | 2 +- mailroom.go | 14 +++++++-- services/tickets/mailgun/web.go | 2 +- services/tickets/rocketchat/web.go | 5 ++-- services/tickets/utils.go | 5 ++-- services/tickets/utils_test.go | 4 +-- services/tickets/zendesk/web.go | 2 +- web/contact/contact.go | 4 +-- web/contact/search_test.go | 2 +- web/ivr/ivr.go | 8 +++--- web/ivr/ivr_test.go | 4 +-- web/org/metrics_test.go | 2 +- web/server.go | 8 ++++-- web/simulation/simulation_test.go | 2 +- web/surveyor/surveyor.go | 4 +-- web/surveyor/surveyor_test.go | 2 +- web/testing.go | 2 +- 57 files changed, 187 insertions(+), 164 deletions(-) diff --git a/core/handlers/base_test.go b/core/handlers/base_test.go index 4dc1c5de3..0d38eb8c4 100644 --- a/core/handlers/base_test.go +++ b/core/handlers/base_test.go @@ -199,7 +199,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) { return triggers.NewBuilder(oa.Env(), testFlow.Reference(), contact).Msg(msg).Build() } - _, err = runner.StartFlow(ctx, db, rp, oa, flow.(*models.Flow), []models.ContactID{models.CathyID, models.BobID, models.GeorgeID, models.AlexandriaID}, options) + _, err = runner.StartFlow(ctx, db, rp, nil, oa, flow.(*models.Flow), []models.ContactID{models.CathyID, models.BobID, models.GeorgeID, models.AlexandriaID}, options) assert.NoError(t, err) results := make(map[models.ContactID]modifyResult) @@ -240,7 +240,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) { assert.NoError(t, err) } - err = models.ApplyEventPreCommitHooks(ctx, tx, rp, oa, scenes) + err = models.ApplyEventPreCommitHooks(ctx, tx, rp, nil, oa, scenes) assert.NoError(t, err) err = tx.Commit() @@ -249,7 +249,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) { tx, err = db.BeginTxx(ctx, nil) assert.NoError(t, err) - err = models.ApplyEventPostCommitHooks(ctx, tx, rp, oa, scenes) + err = models.ApplyEventPostCommitHooks(ctx, tx, rp, nil, oa, scenes) assert.NoError(t, err) err = tx.Commit() diff --git a/core/hooks/commit_added_labels.go b/core/hooks/commit_added_labels.go index bd9a6cbe0..1ab86fe3c 100644 --- a/core/hooks/commit_added_labels.go +++ b/core/hooks/commit_added_labels.go @@ -6,6 +6,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" ) @@ -16,7 +17,7 @@ var CommitAddedLabelsHook models.EventCommitHook = &commitAddedLabelsHook{} type commitAddedLabelsHook struct{} // Apply applies our input labels added, committing them in a single batch -func (h *commitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // build our list of msg label adds, we dedupe these so we never double add in the same transaction seen := make(map[string]bool) adds := make([]*models.MsgLabelAdd, 0, len(scenes)) diff --git a/core/hooks/commit_field_changes.go b/core/hooks/commit_field_changes.go index 1d9dde335..0038f9c83 100644 --- a/core/hooks/commit_field_changes.go +++ b/core/hooks/commit_field_changes.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" - "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/models" + + "github.com/edganiukov/fcm" + "github.com/gomodule/redigo/redis" + "github.com/jmoiron/sqlx" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -20,7 +22,7 @@ var CommitFieldChangesHook models.EventCommitHook = &commitFieldChangesHook{} type commitFieldChangesHook struct{} // Apply squashes and writes all the field updates for the contacts -func (h *commitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // our list of updates fieldUpdates := make([]interface{}, 0, len(scenes)) fieldDeletes := make(map[assets.FieldUUID][]interface{}) diff --git a/core/hooks/commit_group_changes.go b/core/hooks/commit_group_changes.go index 26324d00a..fb1c3764e 100644 --- a/core/hooks/commit_group_changes.go +++ b/core/hooks/commit_group_changes.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var CommitGroupChangesHook models.EventCommitHook = &commitGroupChangesHook{} type commitGroupChangesHook struct{} // Apply squashes and adds or removes all our contact groups -func (h *commitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // build up our list of all adds and removes adds := make([]*models.GroupAdd, 0, len(scenes)) removes := make([]*models.GroupRemove, 0, len(scenes)) diff --git a/core/hooks/commit_ivr.go b/core/hooks/commit_ivr.go index e8916e48c..116d502c8 100644 --- a/core/hooks/commit_ivr.go +++ b/core/hooks/commit_ivr.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var CommitIVRHook models.EventCommitHook = &commitIVRHook{} type commitIVRHook struct{} // Apply takes care of inserting all the messages in the passed in scene assigning topups to them as needed. -func (h *commitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { msgs := make([]*models.Msg, 0, len(scenes)) for _, s := range scenes { for _, m := range s { diff --git a/core/hooks/commit_language_changes.go b/core/hooks/commit_language_changes.go index debc37efe..8a2bcea02 100644 --- a/core/hooks/commit_language_changes.go +++ b/core/hooks/commit_language_changes.go @@ -3,10 +3,12 @@ package hooks import ( "context" - "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/models" + + "github.com/edganiukov/fcm" + "github.com/gomodule/redigo/redis" + "github.com/jmoiron/sqlx" "github.com/nyaruka/null" ) @@ -16,7 +18,7 @@ var CommitLanguageChangesHook models.EventCommitHook = &commitLanguageChangesHoo type commitLanguageChangesHook struct{} // Apply applies our contact language change before our commit -func (h *commitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // build up our list of pairs of contact id and language name updates := make([]interface{}, 0, len(scenes)) for s, e := range scenes { diff --git a/core/hooks/commit_messages.go b/core/hooks/commit_messages.go index 8405c45a4..b12a1716f 100644 --- a/core/hooks/commit_messages.go +++ b/core/hooks/commit_messages.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var CommitMessagesHook models.EventCommitHook = &commitMessagesHook{} type commitMessagesHook struct{} // Apply takes care of inserting all the messages in the passed in scene assigning topups to them as needed. -func (h *commitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { msgs := make([]*models.Msg, 0, len(scenes)) for _, s := range scenes { for _, m := range s { diff --git a/core/hooks/commit_name_changes.go b/core/hooks/commit_name_changes.go index 876392a88..c7f131bac 100644 --- a/core/hooks/commit_name_changes.go +++ b/core/hooks/commit_name_changes.go @@ -4,10 +4,12 @@ import ( "context" "fmt" - "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/models" + + "github.com/edganiukov/fcm" + "github.com/gomodule/redigo/redis" + "github.com/jmoiron/sqlx" "github.com/nyaruka/null" ) @@ -17,7 +19,7 @@ var CommitNameChangesHook models.EventCommitHook = &commitNameChangesHook{} type commitNameChangesHook struct{} // Apply commits our contact name changes as a bulk update for the passed in map of scene -func (h *commitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // build up our list of pairs of contact id and contact name updates := make([]interface{}, 0, len(scenes)) for s, e := range scenes { diff --git a/core/hooks/commit_status_changes.go b/core/hooks/commit_status_changes.go index a55186a04..e24f236a5 100644 --- a/core/hooks/commit_status_changes.go +++ b/core/hooks/commit_status_changes.go @@ -6,6 +6,7 @@ import ( "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -17,7 +18,7 @@ var CommitStatusChangesHook models.EventCommitHook = &commitStatusChangesHook{} type commitStatusChangesHook struct{} // Apply commits our contact status change -func (h *commitStatusChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitStatusChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { statusChanges := make([]*models.ContactStatusChange, 0, len(scenes)) for scene, es := range scenes { diff --git a/core/hooks/commit_urn_changes.go b/core/hooks/commit_urn_changes.go index 55a24d863..1c0b6f4f0 100644 --- a/core/hooks/commit_urn_changes.go +++ b/core/hooks/commit_urn_changes.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var CommitURNChangesHook models.EventCommitHook = &commitURNChangesHook{} type commitURNChangesHook struct{} // Apply adds all our URNS in a batch -func (h *commitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *commitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // gather all our urn changes, we only care about the last change for each scene changes := make([]*models.ContactURNsChanged, 0, len(scenes)) for _, sessionChanges := range scenes { diff --git a/core/hooks/contact_last_seen.go b/core/hooks/contact_last_seen.go index e2d923418..2bd47f9db 100644 --- a/core/hooks/contact_last_seen.go +++ b/core/hooks/contact_last_seen.go @@ -6,6 +6,7 @@ import ( "github.com/nyaruka/goflow/flows" "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -17,7 +18,7 @@ var ContactLastSeenHook models.EventCommitHook = &contactLastSeenHook{} type contactLastSeenHook struct{} // Apply squashes and updates modified_on on all the contacts passed in -func (h *contactLastSeenHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *contactLastSeenHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { for scene, evts := range scenes { lastEvent := evts[len(evts)-1].(flows.Event) diff --git a/core/hooks/contact_modified.go b/core/hooks/contact_modified.go index 69cb05556..0c12d6af7 100644 --- a/core/hooks/contact_modified.go +++ b/core/hooks/contact_modified.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var ContactModifiedHook models.EventCommitHook = &contactModifiedHook{} type contactModifiedHook struct{} // Apply squashes and updates modified_on on all the contacts passed in -func (h *contactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *contactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // our lists of contact ids contactIDs := make([]models.ContactID, 0, len(scenes)) diff --git a/core/hooks/insert_airtime_transfers.go b/core/hooks/insert_airtime_transfers.go index d6c72fdf7..72c38118f 100644 --- a/core/hooks/insert_airtime_transfers.go +++ b/core/hooks/insert_airtime_transfers.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var InsertAirtimeTransfersHook models.EventCommitHook = &insertAirtimeTransfersH type insertAirtimeTransfersHook struct{} // Apply inserts all the airtime transfers that were created -func (h *insertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *insertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // gather all our transfers transfers := make([]*models.AirtimeTransfer, 0, len(scenes)) diff --git a/core/hooks/insert_http_logs.go b/core/hooks/insert_http_logs.go index eeace18e1..2074714b5 100644 --- a/core/hooks/insert_http_logs.go +++ b/core/hooks/insert_http_logs.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var InsertHTTPLogsHook models.EventCommitHook = &insertHTTPLogsHook{} type insertHTTPLogsHook struct{} // Apply inserts all the classifier logs that were created -func (h *insertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *insertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // gather all our logs logs := make([]*models.HTTPLog, 0, len(scenes)) for _, ls := range scenes { diff --git a/core/hooks/insert_start.go b/core/hooks/insert_start.go index 6982b7f9a..164859e18 100644 --- a/core/hooks/insert_start.go +++ b/core/hooks/insert_start.go @@ -7,6 +7,7 @@ import ( "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -18,7 +19,7 @@ var InsertStartHook models.EventCommitHook = &insertStartHook{} type insertStartHook struct{} // Apply inserts our starts -func (h *insertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *insertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() diff --git a/core/hooks/insert_tickets.go b/core/hooks/insert_tickets.go index b6bf164bf..924514cf6 100644 --- a/core/hooks/insert_tickets.go +++ b/core/hooks/insert_tickets.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var InsertTicketsHook models.EventCommitHook = &insertTicketsHook{} type insertTicketsHook struct{} // Apply inserts all the airtime transfers that were created -func (h *insertTicketsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *insertTicketsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // gather all our tickets tickets := make([]*models.Ticket, 0, len(scenes)) diff --git a/core/hooks/insert_webhook_event.go b/core/hooks/insert_webhook_event.go index 9a980bdd6..b7f06e62a 100644 --- a/core/hooks/insert_webhook_event.go +++ b/core/hooks/insert_webhook_event.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var InsertWebhookEventHook models.EventCommitHook = &insertWebhookEventHook{} type insertWebhookEventHook struct{} // Apply inserts all the webook events that were created -func (h *insertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *insertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { events := make([]*models.WebhookEvent, 0, len(scenes)) for _, rs := range scenes { for _, r := range rs { diff --git a/core/hooks/insert_webhook_result.go b/core/hooks/insert_webhook_result.go index bc7f1a583..a757ae86d 100644 --- a/core/hooks/insert_webhook_result.go +++ b/core/hooks/insert_webhook_result.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var InsertWebhookResultHook models.EventCommitHook = &insertWebhookResultHook{} type insertWebhookResultHook struct{} // Apply inserts all the webook results that were created -func (h *insertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *insertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // gather all our results results := make([]*models.WebhookResult, 0, len(scenes)) for _, rs := range scenes { diff --git a/core/hooks/send_messages.go b/core/hooks/send_messages.go index eb9a2fc94..7f940c682 100644 --- a/core/hooks/send_messages.go +++ b/core/hooks/send_messages.go @@ -6,6 +6,7 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/msgio" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" ) @@ -16,7 +17,7 @@ var SendMessagesHook models.EventCommitHook = &sendMessagesHook{} type sendMessagesHook struct{} // Apply sends all non-android messages to courier -func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { msgs := make([]*models.Msg, 0, 1) // for each scene gather all our messages @@ -35,6 +36,6 @@ func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Poo msgs = append(msgs, sceneMsgs...) } - msgio.SendMessages(ctx, tx, rp, msgs) + msgio.SendMessages(ctx, tx, rp, fc, msgs) return nil } diff --git a/core/hooks/start_broadcasts.go b/core/hooks/start_broadcasts.go index 6f267d160..7e7b9dc34 100644 --- a/core/hooks/start_broadcasts.go +++ b/core/hooks/start_broadcasts.go @@ -7,6 +7,7 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/queue" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -18,7 +19,7 @@ var StartBroadcastsHook models.EventCommitHook = &startBroadcastsHook{} type startBroadcastsHook struct{} // Apply queues up our broadcasts for sending -func (h *startBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *startBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() diff --git a/core/hooks/start_start.go b/core/hooks/start_start.go index 60278ed78..369f5577f 100644 --- a/core/hooks/start_start.go +++ b/core/hooks/start_start.go @@ -6,6 +6,7 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/queue" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -17,7 +18,7 @@ var StartStartHook models.EventCommitHook = &startStartHook{} type startStartHook struct{} // Apply queues up our flow starts -func (h *startStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *startStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { rc := rp.Get() defer rc.Close() diff --git a/core/hooks/unsubscribe_resthook.go b/core/hooks/unsubscribe_resthook.go index fddfd0f06..e782cfd5d 100644 --- a/core/hooks/unsubscribe_resthook.go +++ b/core/hooks/unsubscribe_resthook.go @@ -5,6 +5,7 @@ import ( "github.com/nyaruka/mailroom/core/models" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -16,7 +17,7 @@ var UnsubscribeResthookHook models.EventCommitHook = &unsubscribeResthookHook{} type unsubscribeResthookHook struct{} // Apply squashes and applies all our resthook unsubscriptions -func (h *unsubscribeResthookHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scene map[*models.Scene][]interface{}) error { +func (h *unsubscribeResthookHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scene map[*models.Scene][]interface{}) error { // gather all our unsubscribes unsubs := make([]*models.ResthookUnsubscribe, 0, len(scene)) for _, us := range scene { diff --git a/core/hooks/update_campaign_events.go b/core/hooks/update_campaign_events.go index 26de31c2c..6742cb92f 100644 --- a/core/hooks/update_campaign_events.go +++ b/core/hooks/update_campaign_events.go @@ -4,10 +4,12 @@ import ( "context" "time" - "github.com/gomodule/redigo/redis" - "github.com/jmoiron/sqlx" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/mailroom/core/models" + + "github.com/edganiukov/fcm" + "github.com/gomodule/redigo/redis" + "github.com/jmoiron/sqlx" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -18,7 +20,7 @@ var UpdateCampaignEventsHook models.EventCommitHook = &updateCampaignEventsHook{ type updateCampaignEventsHook struct{} // Apply will update all the campaigns for the passed in scene, minimizing the number of queries to do so -func (h *updateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { +func (h *updateCampaignEventsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error { // these are all the events we need to delete unfired fires for deletes := make([]*models.FireDelete, 0, 5) diff --git a/core/ivr/ivr.go b/core/ivr/ivr.go index ca4468f75..193e79064 100644 --- a/core/ivr/ivr.go +++ b/core/ivr/ivr.go @@ -9,6 +9,7 @@ import ( "strconv" "time" + "github.com/edganiukov/fcm" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/storage" "github.com/nyaruka/gocommon/urns" @@ -296,7 +297,7 @@ func WriteErrorResponse(ctx context.Context, db *sqlx.DB, client Client, conn *m // StartIVRFlow takes care of starting the flow in the passed in start for the passed in contact and URN func StartIVRFlow( - ctx context.Context, db *sqlx.DB, rp *redis.Pool, client Client, resumeURL string, oa *models.OrgAssets, + ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, client Client, resumeURL string, oa *models.OrgAssets, channel *models.Channel, conn *models.ChannelConnection, c *models.Contact, urn urns.URN, startID models.StartID, r *http.Request, w http.ResponseWriter) error { @@ -370,7 +371,7 @@ func StartIVRFlow( } // start our flow - sessions, err := runner.StartFlowForContacts(ctx, db, rp, oa, flow, []flows.Trigger{trigger}, hook, true) + sessions, err := runner.StartFlowForContacts(ctx, db, rp, fc, oa, flow, []flows.Trigger{trigger}, hook, true) if err != nil { return errors.Wrapf(err, "error starting flow") } @@ -390,7 +391,7 @@ func StartIVRFlow( // ResumeIVRFlow takes care of resuming the flow in the passed in start for the passed in contact and URN func ResumeIVRFlow( - ctx context.Context, config *config.Config, db *sqlx.DB, rp *redis.Pool, store storage.Storage, + ctx context.Context, config *config.Config, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, store storage.Storage, resumeURL string, client Client, oa *models.OrgAssets, channel *models.Channel, conn *models.ChannelConnection, c *models.Contact, urn urns.URN, r *http.Request, w http.ResponseWriter) error { @@ -523,7 +524,7 @@ func ResumeIVRFlow( } } - session, err = runner.ResumeFlow(ctx, db, rp, oa, session, resume, hook) + session, err = runner.ResumeFlow(ctx, db, rp, fc, oa, session, resume, hook) if err != nil { return errors.Wrapf(err, "error resuming ivr flow") } diff --git a/core/models/events.go b/core/models/events.go index 572386b93..94d780620 100644 --- a/core/models/events.go +++ b/core/models/events.go @@ -3,6 +3,7 @@ package models import ( "context" + "github.com/edganiukov/fcm" "github.com/nyaruka/goflow/flows" "github.com/gomodule/redigo/redis" @@ -128,11 +129,11 @@ func ApplyPreWriteEvent(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *O // EventCommitHook defines a callback that will accept a certain type of events across session, either before or after committing type EventCommitHook interface { - Apply(context.Context, *sqlx.Tx, *redis.Pool, *OrgAssets, map[*Scene][]interface{}) error + Apply(context.Context, *sqlx.Tx, *redis.Pool, *fcm.Client, *OrgAssets, map[*Scene][]interface{}) error } // ApplyEventPreCommitHooks runs through all the pre event hooks for the passed in sessions and applies their events -func ApplyEventPreCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, scenes []*Scene) error { +func ApplyEventPreCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, org *OrgAssets, scenes []*Scene) error { // gather all our hook events together across our sessions preHooks := make(map[EventCommitHook]map[*Scene][]interface{}) for _, s := range scenes { @@ -148,7 +149,7 @@ func ApplyEventPreCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, // now fire each of our hooks for hook, args := range preHooks { - err := hook.Apply(ctx, tx, rp, org, args) + err := hook.Apply(ctx, tx, rp, fc, org, args) if err != nil { return errors.Wrapf(err, "error applying pre commit hook: %T", hook) } @@ -158,7 +159,7 @@ func ApplyEventPreCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, } // ApplyEventPostCommitHooks runs through all the post event hooks for the passed in sessions and applies their events -func ApplyEventPostCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, scenes []*Scene) error { +func ApplyEventPostCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, org *OrgAssets, scenes []*Scene) error { // gather all our hook events together across our sessions postHooks := make(map[EventCommitHook]map[*Scene][]interface{}) for _, s := range scenes { @@ -174,7 +175,7 @@ func ApplyEventPostCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, // now fire each of our hooks for hook, args := range postHooks { - err := hook.Apply(ctx, tx, rp, org, args) + err := hook.Apply(ctx, tx, rp, fc, org, args) if err != nil { return errors.Wrapf(err, "error applying post commit hook: %v", hook) } @@ -184,7 +185,7 @@ func ApplyEventPostCommitHooks(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, } // HandleAndCommitEvents takes a set of contacts and events, handles the events and applies any hooks, and commits everything -func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool, oa *OrgAssets, contactEvents map[*flows.Contact][]flows.Event) error { +func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool, fc *fcm.Client, oa *OrgAssets, contactEvents map[*flows.Contact][]flows.Event) error { // create scenes for each contact scenes := make([]*Scene, 0, len(contactEvents)) for contact := range contactEvents { @@ -207,7 +208,7 @@ func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool } // gather all our pre commit events, group them by hook and apply them - err = ApplyEventPreCommitHooks(ctx, tx, rp, oa, scenes) + err = ApplyEventPreCommitHooks(ctx, tx, rp, fc, oa, scenes) if err != nil { return errors.Wrapf(err, "error applying pre commit hooks") } @@ -224,7 +225,7 @@ func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool } // apply the post commit hooks - err = ApplyEventPostCommitHooks(ctx, tx, rp, oa, scenes) + err = ApplyEventPostCommitHooks(ctx, tx, rp, fc, oa, scenes) if err != nil { return errors.Wrapf(err, "error applying post commit hooks") } @@ -237,7 +238,7 @@ func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool } // ApplyModifiers modifies contacts by applying modifiers and handling the resultant events -func ApplyModifiers(ctx context.Context, db QueryerWithTx, rp *redis.Pool, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { +func ApplyModifiers(ctx context.Context, db QueryerWithTx, rp *redis.Pool, fc *fcm.Client, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { // create an environment instance with location support env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations()) @@ -252,7 +253,7 @@ func ApplyModifiers(ctx context.Context, db QueryerWithTx, rp *redis.Pool, oa *O eventsByContact[contact] = events } - err := HandleAndCommitEvents(ctx, db, rp, oa, eventsByContact) + err := HandleAndCommitEvents(ctx, db, rp, fc, oa, eventsByContact) if err != nil { return nil, errors.Wrap(err, "error commiting events") } diff --git a/core/models/imports.go b/core/models/imports.go index bd91d3a66..2d979e390 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -113,7 +113,7 @@ func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB, orgID O } // and apply in bulk - _, err = ApplyModifiers(ctx, db, nil, oa, modifiersByContact) + _, err = ApplyModifiers(ctx, db, nil, nil, oa, modifiersByContact) if err != nil { return errors.Wrap(err, "error applying modifiers") } diff --git a/core/models/runs.go b/core/models/runs.go index 06ce92034..c115b239f 100644 --- a/core/models/runs.go +++ b/core/models/runs.go @@ -8,6 +8,7 @@ import ( "fmt" "time" + "github.com/edganiukov/fcm" "github.com/nyaruka/gocommon/uuids" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/envs" @@ -458,7 +459,7 @@ func (s *Session) calculateTimeout(fs flows.Session, sprint flows.Sprint) { } // WriteUpdatedSession updates the session based on the state passed in from our engine session, this also takes care of applying any event hooks -func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, fs flows.Session, sprint flows.Sprint, hook SessionCommitHook) error { +func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, org *OrgAssets, fs flows.Session, sprint flows.Sprint, hook SessionCommitHook) error { // make sure we have our seen runs if s.seenRuns == nil { return errors.Errorf("missing seen runs, cannot update session") @@ -588,7 +589,7 @@ func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redi } // gather all our pre commit events, group them by hook and apply them - err = ApplyEventPreCommitHooks(ctx, tx, rp, org, []*Scene{s.scene}) + err = ApplyEventPreCommitHooks(ctx, tx, rp, fc, org, []*Scene{s.scene}) if err != nil { return errors.Wrapf(err, "error applying pre commit hook: %T", hook) } @@ -636,7 +637,7 @@ WHERE // WriteSessions writes the passed in session to our database, writes any runs that need to be created // as well as appying any events created in the session -func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAssets, ss []flows.Session, sprints []flows.Sprint, hook SessionCommitHook) ([]*Session, error) { +func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, org *OrgAssets, ss []flows.Session, sprints []flows.Sprint, hook SessionCommitHook) ([]*Session, error) { if len(ss) == 0 { return nil, nil } @@ -733,7 +734,7 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAss } // gather all our pre commit events, group them by hook - err = ApplyEventPreCommitHooks(ctx, tx, rp, org, scenes) + err = ApplyEventPreCommitHooks(ctx, tx, rp, fc, org, scenes) if err != nil { return nil, errors.Wrapf(err, "error applying pre commit hook: %T", hook) } diff --git a/core/msgio/android.go b/core/msgio/android.go index 0917214fc..8401802d8 100644 --- a/core/msgio/android.go +++ b/core/msgio/android.go @@ -1,38 +1,17 @@ package msgio import ( - "sync" "time" - "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/core/models" - "github.com/pkg/errors" "github.com/edganiukov/fcm" "github.com/sirupsen/logrus" ) -var clientInit sync.Once -var fcmClient *fcm.Client - -func init() { - clientInit.Do(func() { - if config.Mailroom.FCMKey == "" { - logrus.Error("fcm not configured, no syncing of android channels") - return - } - - var err error - fcmClient, err = fcm.NewClient(config.Mailroom.FCMKey) - if err != nil { - panic(errors.Wrap(err, "unable to create FCM client")) - } - }) -} - // SyncAndroidChannels tries to trigger syncs of the given Android channels via FCM -func SyncAndroidChannels(channels []*models.Channel) { - if fcmClient == nil { +func SyncAndroidChannels(fc *fcm.Client, channels []*models.Channel) { + if fc == nil { logrus.Warn("skipping Android sync as instance has not configured FCM") return } @@ -52,7 +31,7 @@ func SyncAndroidChannels(channels []*models.Channel) { } start := time.Now() - _, err := fcmClient.Send(sync) + _, err := fc.Send(sync) if err != nil { // log failures but continue, relayer will sync on its own @@ -62,8 +41,3 @@ func SyncAndroidChannels(channels []*models.Channel) { } } } - -// SetFCMClient sets the FCM client. Used for testing. -func SetFCMClient(client *fcm.Client) { - fcmClient = client -} diff --git a/core/msgio/android_test.go b/core/msgio/android_test.go index b7724b488..5ecbf9286 100644 --- a/core/msgio/android_test.go +++ b/core/msgio/android_test.go @@ -69,7 +69,7 @@ func TestSyncAndroidChannels(t *testing.T) { mockFCM := newMockFCMEndpoint("FCMID3") defer mockFCM.Stop() - msgio.SetFCMClient(mockFCM.Client("FCMKEY123")) + fc := mockFCM.Client("FCMKEY123") // create some Android channels channel1ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 1", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": ""}) // no FCM ID @@ -83,7 +83,7 @@ func TestSyncAndroidChannels(t *testing.T) { channel2 := oa.ChannelByID(channel2ID) channel3 := oa.ChannelByID(channel3ID) - msgio.SyncAndroidChannels([]*models.Channel{channel1, channel2, channel3}) + msgio.SyncAndroidChannels(fc, []*models.Channel{channel1, channel2, channel3}) // check that we try to sync the 2 channels with FCM IDs, even tho one fails assert.Equal(t, 2, len(mockFCM.Messages)) diff --git a/core/msgio/send.go b/core/msgio/send.go index 4463c147f..2fa12c16d 100644 --- a/core/msgio/send.go +++ b/core/msgio/send.go @@ -7,11 +7,12 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/apex/log" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" ) // SendMessages tries to send the given messages via Courier or Android syncing -func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, msgs []*models.Msg) { +func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, fc *fcm.Client, msgs []*models.Msg) { // messages to be sent by courier, organized by contact courierMsgs := make(map[models.ContactID][]*models.Msg, 100) @@ -65,7 +66,7 @@ func SendMessages(ctx context.Context, db models.Queryer, rp *redis.Pool, msgs [ // if we have any android messages, trigger syncs for the unique channels if len(androidChannels) > 0 { - SyncAndroidChannels(androidChannels) + SyncAndroidChannels(fc, androidChannels) } // any messages that didn't get sent should be moved back to pending (they are queued at creation to save an diff --git a/core/msgio/send_test.go b/core/msgio/send_test.go index 95e69add1..ec60a6d2c 100644 --- a/core/msgio/send_test.go +++ b/core/msgio/send_test.go @@ -62,7 +62,7 @@ func TestSendMessages(t *testing.T) { mockFCM := newMockFCMEndpoint("FCMID3") defer mockFCM.Stop() - msgio.SetFCMClient(mockFCM.Client("FCMKEY123")) + fc := mockFCM.Client("FCMKEY123") // create some Andoid channels androidChannel1ID := testdata.InsertChannel(t, db, models.Org1, "A", "Android 1", []string{"tel"}, "SR", map[string]interface{}{"FCM_ID": "FCMID1"}) @@ -151,7 +151,7 @@ func TestSendMessages(t *testing.T) { rc.Do("FLUSHDB") mockFCM.Messages = nil - msgio.SendMessages(ctx, db, rp, msgs) + msgio.SendMessages(ctx, db, rp, fc, msgs) assertCourierQueueSizes(t, rc, tc.QueueSizes, "courier queue sizes mismatch in '%s'", tc.Description) diff --git a/core/runner/runner.go b/core/runner/runner.go index 20a8e06b4..60363d5db 100644 --- a/core/runner/runner.go +++ b/core/runner/runner.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/nyaruka/goflow/assets" @@ -62,7 +63,7 @@ type StartOptions struct { type TriggerBuilder func(contact *flows.Contact) flows.Trigger // ResumeFlow resumes the passed in session using the passed in session -func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.OrgAssets, session *models.Session, resume flows.Resume, hook models.SessionCommitHook) (*models.Session, error) { +func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, session *models.Session, resume flows.Resume, hook models.SessionCommitHook) (*models.Session, error) { start := time.Now() sa := oa.SessionAssets() @@ -103,7 +104,7 @@ func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.Org } // write our updated session and runs - err = session.WriteUpdatedSession(txCTX, tx, rp, oa, fs, sprint, hook) + err = session.WriteUpdatedSession(txCTX, tx, rp, fc, oa, fs, sprint, hook) if err != nil { tx.Rollback() return nil, errors.Wrapf(err, "error updating session for resume") @@ -125,7 +126,7 @@ func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.Org return nil, errors.Wrapf(err, "error starting transaction for post commit hooks") } - err = models.ApplyEventPostCommitHooks(txCTX, tx, rp, oa, []*models.Scene{session.Scene()}) + err = models.ApplyEventPostCommitHooks(txCTX, tx, rp, fc, oa, []*models.Scene{session.Scene()}) if err == nil { err = tx.Commit() } @@ -141,7 +142,7 @@ func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.Org // StartFlowBatch starts the flow for the passed in org, contacts and flow func StartFlowBatch( - ctx context.Context, db *sqlx.DB, rp *redis.Pool, + ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, batch *models.FlowStartBatch) ([]*models.Session, error) { start := time.Now() @@ -230,7 +231,7 @@ func StartFlowBatch( options.TriggerBuilder = triggerBuilder options.CommitHook = updateStartID - sessions, err := StartFlow(ctx, db, rp, oa, flow, batch.ContactIDs(), options) + sessions, err := StartFlow(ctx, db, rp, fc, oa, flow, batch.ContactIDs(), options) if err != nil { return nil, errors.Wrapf(err, "error starting flow batch") } @@ -244,7 +245,7 @@ func StartFlowBatch( // FireCampaignEvents starts the flow for the passed in org, contact and flow func FireCampaignEvents( - ctx context.Context, db *sqlx.DB, rp *redis.Pool, + ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, orgID models.OrgID, fires []*models.EventFire, flowUUID assets.FlowUUID, campaign *triggers.CampaignReference, eventUUID triggers.CampaignEventUUID) ([]models.ContactID, error) { @@ -370,7 +371,7 @@ func FireCampaignEvents( return nil } - sessions, err := StartFlow(ctx, db, rp, oa, dbFlow, contactIDs, options) + sessions, err := StartFlow(ctx, db, rp, fc, oa, dbFlow, contactIDs, options) if err != nil { logrus.WithField("contact_ids", contactIDs).WithError(err).Errorf("error starting flow for campaign event: %s", eventUUID) } else { @@ -399,7 +400,7 @@ func FireCampaignEvents( // StartFlow runs the passed in flow for the passed in contact func StartFlow( - ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.OrgAssets, + ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, flow *models.Flow, contactIDs []models.ContactID, options *StartOptions) ([]*models.Session, error) { if len(contactIDs) == 0 { @@ -500,7 +501,7 @@ func StartFlow( triggers = append(triggers, trigger) } - ss, err := StartFlowForContacts(ctx, db, rp, oa, flow, triggers, options.CommitHook, options.Interrupt) + ss, err := StartFlowForContacts(ctx, db, rp, fc, oa, flow, triggers, options.CommitHook, options.Interrupt) if err != nil { return nil, errors.Wrapf(err, "error starting flow for contacts") } @@ -526,7 +527,7 @@ func StartFlow( // StartFlowForContacts runs the passed in flow for the passed in contact func StartFlowForContacts( - ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.OrgAssets, + ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, flow *models.Flow, triggers []flows.Trigger, hook models.SessionCommitHook, interrupt bool) ([]*models.Session, error) { sa := oa.SessionAssets() @@ -588,7 +589,7 @@ func StartFlowForContacts( } // write our session to the db - dbSessions, err := models.WriteSessions(txCTX, tx, rp, oa, sessions, sprints, hook) + dbSessions, err := models.WriteSessions(txCTX, tx, rp, fc, oa, sessions, sprints, hook) if err == nil { // commit it at once commitStart := time.Now() @@ -628,7 +629,7 @@ func StartFlowForContacts( } } - dbSession, err := models.WriteSessions(txCTX, tx, rp, oa, []flows.Session{session}, []flows.Sprint{sprint}, hook) + dbSession, err := models.WriteSessions(txCTX, tx, rp, fc, oa, []flows.Session{session}, []flows.Sprint{sprint}, hook) if err != nil { tx.Rollback() log.WithField("contact_uuid", session.Contact().UUID()).WithError(err).Errorf("error writing session to db") @@ -660,7 +661,7 @@ func StartFlowForContacts( scenes = append(scenes, s.Scene()) } - err = models.ApplyEventPostCommitHooks(txCTX, tx, rp, oa, scenes) + err = models.ApplyEventPostCommitHooks(txCTX, tx, rp, fc, oa, scenes) if err == nil { err = tx.Commit() } @@ -682,7 +683,7 @@ func StartFlowForContacts( continue } - err = models.ApplyEventPostCommitHooks(ctx, tx, rp, oa, []*models.Scene{session.Scene()}) + err = models.ApplyEventPostCommitHooks(ctx, tx, rp, fc, oa, []*models.Scene{session.Scene()}) if err != nil { tx.Rollback() log.WithError(err).Errorf("error applying post commit hook") diff --git a/core/runner/runner_test.go b/core/runner/runner_test.go index 7c2479cb8..301504dd1 100644 --- a/core/runner/runner_test.go +++ b/core/runner/runner_test.go @@ -60,7 +60,7 @@ func TestCampaignStarts(t *testing.T) { Scheduled: now, }, } - sessions, err := FireCampaignEvents(ctx, db, rp, models.Org1, fires, models.CampaignFlowUUID, campaign, "e68f4c70-9db1-44c8-8498-602d6857235e") + sessions, err := FireCampaignEvents(ctx, db, rp, nil, models.Org1, fires, models.CampaignFlowUUID, campaign, "e68f4c70-9db1-44c8-8498-602d6857235e") assert.NoError(t, err) assert.Equal(t, 2, len(sessions)) @@ -142,7 +142,7 @@ func TestBatchStart(t *testing.T) { WithExtra(tc.Extra) batch := start.CreateBatch(contactIDs, true, len(contactIDs)) - sessions, err := StartFlowBatch(ctx, db, rp, batch) + sessions, err := StartFlowBatch(ctx, db, rp, nil, batch) assert.NoError(t, err) assert.Equal(t, tc.Count, len(sessions), "%d: unexpected number of sessions created", i) @@ -191,7 +191,7 @@ func TestContactRuns(t *testing.T) { assert.NoError(t, err) trigger := triggers.NewBuilder(oa.Env(), flow.FlowReference(), contact).Manual().Build() - sessions, err := StartFlowForContacts(ctx, db, rp, oa, flow, []flows.Trigger{trigger}, nil, true) + sessions, err := StartFlowForContacts(ctx, db, rp, nil, oa, flow, []flows.Trigger{trigger}, nil, true) assert.NoError(t, err) assert.NotNil(t, sessions) @@ -232,7 +232,7 @@ func TestContactRuns(t *testing.T) { msg.SetID(10) resume := resumes.NewMsg(oa.Env(), contact, msg) - session, err = ResumeFlow(ctx, db, rp, oa, session, resume, nil) + session, err = ResumeFlow(ctx, db, rp, nil, oa, session, resume, nil) assert.NoError(t, err) assert.NotNil(t, session) diff --git a/core/tasks/broadcasts/worker.go b/core/tasks/broadcasts/worker.go index 7ebcd402b..50b6765ef 100644 --- a/core/tasks/broadcasts/worker.go +++ b/core/tasks/broadcasts/worker.go @@ -5,6 +5,7 @@ import ( "encoding/json" "time" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/urns" @@ -145,11 +146,11 @@ func handleSendBroadcastBatch(ctx context.Context, mr *mailroom.Mailroom, task * } // try to send the batch - return SendBroadcastBatch(ctx, mr.DB, mr.RP, broadcast) + return SendBroadcastBatch(ctx, mr.DB, mr.RP, mr.FCMClient, broadcast) } // SendBroadcastBatch sends the passed in broadcast batch -func SendBroadcastBatch(ctx context.Context, db *sqlx.DB, rp *redis.Pool, bcast *models.BroadcastBatch) error { +func SendBroadcastBatch(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, bcast *models.BroadcastBatch) error { // always set our broadcast as sent if it is our last defer func() { if bcast.IsLast() { @@ -171,6 +172,6 @@ func SendBroadcastBatch(ctx context.Context, db *sqlx.DB, rp *redis.Pool, bcast return errors.Wrapf(err, "error creating broadcast messages") } - msgio.SendMessages(ctx, db, rp, msgs) + msgio.SendMessages(ctx, db, rp, fc, msgs) return nil } diff --git a/core/tasks/broadcasts/worker_test.go b/core/tasks/broadcasts/worker_test.go index e54356393..cd36d68bc 100644 --- a/core/tasks/broadcasts/worker_test.go +++ b/core/tasks/broadcasts/worker_test.go @@ -104,7 +104,7 @@ func TestBroadcastEvents(t *testing.T) { err = json.Unmarshal(task.Task, batch) assert.NoError(t, err) - err = SendBroadcastBatch(ctx, db, rp, batch) + err = SendBroadcastBatch(ctx, db, rp, nil, batch) assert.NoError(t, err) } @@ -213,7 +213,7 @@ func TestBroadcastTask(t *testing.T) { err = json.Unmarshal(task.Task, batch) assert.NoError(t, err) - err = SendBroadcastBatch(ctx, db, rp, batch) + err = SendBroadcastBatch(ctx, db, rp, nil, batch) assert.NoError(t, err) } diff --git a/core/tasks/campaigns/fire_campaign_event.go b/core/tasks/campaigns/fire_campaign_event.go index e5536c0ec..b3a49000c 100644 --- a/core/tasks/campaigns/fire_campaign_event.go +++ b/core/tasks/campaigns/fire_campaign_event.go @@ -82,7 +82,7 @@ func (t *FireCampaignEventTask) Perform(ctx context.Context, mr *mailroom.Mailro campaign := triggers.NewCampaignReference(triggers.CampaignUUID(t.CampaignUUID), t.CampaignName) - started, err := runner.FireCampaignEvents(ctx, db, rp, orgID, fires, t.FlowUUID, campaign, triggers.CampaignEventUUID(t.EventUUID)) + started, err := runner.FireCampaignEvents(ctx, db, rp, mr.FCMClient, orgID, fires, t.FlowUUID, campaign, triggers.CampaignEventUUID(t.EventUUID)) // remove all the contacts that were started for _, contactID := range started { diff --git a/core/tasks/handler/cron_test.go b/core/tasks/handler/cron_test.go index 50b712cfa..4114a0704 100644 --- a/core/tasks/handler/cron_test.go +++ b/core/tasks/handler/cron_test.go @@ -49,7 +49,7 @@ func TestRetryMsgs(t *testing.T) { // should have one message requeued task, _ := queue.PopNextTask(rc, queue.HandlerQueue) assert.NotNil(t, task) - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err) // message should be handled now diff --git a/core/tasks/handler/handler_test.go b/core/tasks/handler/handler_test.go index 5eb398d88..3e5e39302 100644 --- a/core/tasks/handler/handler_test.go +++ b/core/tasks/handler/handler_test.go @@ -139,7 +139,7 @@ func TestMsgEvents(t *testing.T) { task, err = queue.PopNextTask(rc, queue.HandlerQueue) assert.NoError(t, err, "%d: error popping next task", i) - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err, "%d: error when handling event", i) // if we are meant to have a response @@ -176,7 +176,7 @@ func TestMsgEvents(t *testing.T) { for i := 0; i < 3; i++ { task, _ = queue.PopNextTask(rc, queue.HandlerQueue) assert.NotNil(t, task) - err := handleContactEvent(ctx, db, rp, task) + err := handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err) } @@ -194,7 +194,7 @@ func TestMsgEvents(t *testing.T) { AddHandleTask(rc, models.Org2FredID, task) task, _ = queue.PopNextTask(rc, queue.HandlerQueue) assert.NotNil(t, task) - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err) // should get our catch all trigger @@ -213,7 +213,7 @@ func TestMsgEvents(t *testing.T) { task = makeMsgTask(models.Org2, models.Org2ChannelID, models.Org2FredID, models.Org2FredURN, models.Org2FredURNID, "start") AddHandleTask(rc, models.Org2FredID, task) task, _ = queue.PopNextTask(rc, queue.HandlerQueue) - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err) db.Get(&text, `SELECT text FROM msgs_msg WHERE contact_id = $1 AND direction = 'O' AND created_on > $2 ORDER BY id DESC LIMIT 1`, models.Org2FredID, previous) @@ -287,7 +287,7 @@ func TestChannelEvents(t *testing.T) { task, err = queue.PopNextTask(rc, queue.HandlerQueue) assert.NoError(t, err, "%d: error popping next task", i) - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err, "%d: error when handling event", i) // if we are meant to have a response @@ -336,7 +336,7 @@ func TestStopEvent(t *testing.T) { task, err = queue.PopNextTask(rc, queue.HandlerQueue) assert.NoError(t, err, "error popping next task") - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err, "error when handling event") // check that only george is in our group @@ -467,7 +467,7 @@ func TestTimedEvents(t *testing.T) { task, err = queue.PopNextTask(rc, queue.HandlerQueue) assert.NoError(t, err, "%d: error popping next task", i) - err = handleContactEvent(ctx, db, rp, task) + err = handleContactEvent(ctx, db, rp, nil, task) assert.NoError(t, err, "%d: error when handling event", i) var text string diff --git a/core/tasks/handler/worker.go b/core/tasks/handler/worker.go index a2fdb2764..2385f89dd 100644 --- a/core/tasks/handler/worker.go +++ b/core/tasks/handler/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/urns" @@ -86,12 +87,12 @@ func addHandleTask(rc redis.Conn, contactID models.ContactID, task *queue.Task, } func handleEvent(ctx context.Context, mr *mailroom.Mailroom, task *queue.Task) error { - return handleContactEvent(ctx, mr.DB, mr.RP, task) + return handleContactEvent(ctx, mr.DB, mr.RP, mr.FCMClient, task) } // handleContactEvent is called when an event comes in for a contact. to make sure we don't get into // a situation of being off by one, this task ingests and handles all the events for a contact, one by one -func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task *queue.Task) error { +func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, task *queue.Task) error { ctx, cancel := context.WithTimeout(ctx, time.Minute*5) defer cancel() @@ -168,7 +169,7 @@ func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task * if err != nil { return errors.Wrapf(err, "error unmarshalling channel event: %s", event) } - _, err = HandleChannelEvent(ctx, db, rp, models.ChannelEventType(contactEvent.Type), evt, nil) + _, err = HandleChannelEvent(ctx, db, rp, fc, models.ChannelEventType(contactEvent.Type), evt, nil) case MsgEventType: msg := &MsgEvent{} @@ -176,7 +177,7 @@ func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task * if err != nil { return errors.Wrapf(err, "error unmarshalling msg event: %s", event) } - err = handleMsgEvent(ctx, db, rp, msg) + err = handleMsgEvent(ctx, db, rp, fc, msg) case TimeoutEventType, ExpirationEventType: evt := &TimedEvent{} @@ -184,7 +185,7 @@ func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task * if err != nil { return errors.Wrapf(err, "error unmarshalling timeout event: %s", event) } - err = handleTimedEvent(ctx, db, rp, contactEvent.Type, evt) + err = handleTimedEvent(ctx, db, rp, fc, contactEvent.Type, evt) default: return errors.Errorf("unknown contact event type: %s", contactEvent.Type) @@ -223,7 +224,7 @@ func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task * } // handleTimedEvent is called for timeout events -func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventType string, event *TimedEvent) error { +func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, eventType string, event *TimedEvent) error { start := time.Now() log := logrus.WithField("event_type", eventType).WithField("contact_id", event.OrgID).WithField("session_id", event.SessionID) oa, err := models.GetOrgAssets(ctx, db, event.OrgID) @@ -304,7 +305,7 @@ func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventTyp return errors.Errorf("unknown event type: %s", eventType) } - _, err = runner.ResumeFlow(ctx, db, rp, oa, session, resume, nil) + _, err = runner.ResumeFlow(ctx, db, rp, fc, oa, session, resume, nil) if err != nil { return errors.Wrapf(err, "error resuming flow for timeout") } @@ -314,7 +315,7 @@ func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventTyp } // HandleChannelEvent is called for channel events -func HandleChannelEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventType models.ChannelEventType, event *models.ChannelEvent, conn *models.ChannelConnection) (*models.Session, error) { +func HandleChannelEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, eventType models.ChannelEventType, event *models.ChannelEvent, conn *models.ChannelConnection) (*models.Session, error) { oa, err := models.GetOrgAssets(ctx, db, event.OrgID()) if err != nil { return nil, errors.Wrapf(err, "error loading org") @@ -461,7 +462,7 @@ func HandleChannelEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventT } } - sessions, err := runner.StartFlowForContacts(ctx, db, rp, oa, flow, []flows.Trigger{flowTrigger}, hook, true) + sessions, err := runner.StartFlowForContacts(ctx, db, rp, fc, oa, flow, []flows.Trigger{flowTrigger}, hook, true) if err != nil { return nil, errors.Wrapf(err, "error starting flow for contact") } @@ -498,7 +499,7 @@ func handleStopEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *St } // handleMsgEvent is called when a new message arrives from a contact -func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *MsgEvent) error { +func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, event *MsgEvent) error { oa, err := models.GetOrgAssets(ctx, db, event.OrgID) if err != nil { return errors.Wrapf(err, "error loading org") @@ -649,7 +650,7 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg // otherwise build the trigger and start the flow directly trigger := triggers.NewBuilder(oa.Env(), flow.FlowReference(), contact).Msg(msgIn).WithMatch(trigger.Match()).Build() - _, err = runner.StartFlowForContacts(ctx, db, rp, oa, flow, []flows.Trigger{trigger}, hook, true) + _, err = runner.StartFlowForContacts(ctx, db, rp, fc, oa, flow, []flows.Trigger{trigger}, hook, true) if err != nil { return errors.Wrapf(err, "error starting flow for contact") } @@ -660,7 +661,7 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg // if there is a session, resume it if session != nil && flow != nil { resume := resumes.NewMsg(oa.Env(), contact, msgIn) - _, err = runner.ResumeFlow(ctx, db, rp, oa, session, resume, hook) + _, err = runner.ResumeFlow(ctx, db, rp, fc, oa, session, resume, hook) if err != nil { return errors.Wrapf(err, "error resuming flow for contact") } @@ -668,19 +669,19 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg } // this message didn't trigger and new sessions or resume any existing ones, so handle as inbox - err = handleAsInbox(ctx, db, rp, oa, contact, msgIn, topupID) + err = handleAsInbox(ctx, db, rp, fc, oa, contact, msgIn, topupID) if err != nil { return errors.Wrapf(err, "error handling inbox message") } return nil } -func handleAsInbox(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.OrgAssets, contact *flows.Contact, msg *flows.MsgIn, topupID models.TopupID) error { +func handleAsInbox(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, contact *flows.Contact, msg *flows.MsgIn, topupID models.TopupID) error { msgEvent := events.NewMsgReceived(msg) contact.SetLastSeenOn(msgEvent.CreatedOn()) contactEvents := map[*flows.Contact][]flows.Event{contact: {msgEvent}} - err := models.HandleAndCommitEvents(ctx, db, rp, oa, contactEvents) + err := models.HandleAndCommitEvents(ctx, db, rp, fc, oa, contactEvents) if err != nil { return errors.Wrap(err, "error handling inbox message events") } diff --git a/core/tasks/starts/worker.go b/core/tasks/starts/worker.go index 45328b65a..9fb5d6f2b 100644 --- a/core/tasks/starts/worker.go +++ b/core/tasks/starts/worker.go @@ -200,7 +200,7 @@ func handleFlowStartBatch(ctx context.Context, mr *mailroom.Mailroom, task *queu } // start these contacts in our flow - _, err = runner.StartFlowBatch(ctx, mr.DB, mr.RP, startBatch) + _, err = runner.StartFlowBatch(ctx, mr.DB, mr.RP, mr.FCMClient, startBatch) if err != nil { return errors.Wrapf(err, "error starting flow batch: %s", string(task.Task)) } diff --git a/core/tasks/starts/worker_test.go b/core/tasks/starts/worker_test.go index 532851f3f..1d009906a 100644 --- a/core/tasks/starts/worker_test.go +++ b/core/tasks/starts/worker_test.go @@ -284,7 +284,7 @@ func TestStarts(t *testing.T) { err = json.Unmarshal(task.Task, batch) assert.NoError(t, err) - _, err = runner.StartFlowBatch(ctx, db, rp, batch) + _, err = runner.StartFlowBatch(ctx, db, rp, nil, batch) assert.NoError(t, err) } diff --git a/mailroom.go b/mailroom.go index 3661e82a9..1ee6faba8 100644 --- a/mailroom.go +++ b/mailroom.go @@ -10,13 +10,14 @@ import ( "time" "github.com/nyaruka/gocommon/storage" + "github.com/nyaruka/librato" "github.com/nyaruka/mailroom/config" "github.com/nyaruka/mailroom/core/queue" "github.com/nyaruka/mailroom/web" + "github.com/edganiukov/fcm" "github.com/gomodule/redigo/redis" "github.com/jmoiron/sqlx" - "github.com/nyaruka/librato" "github.com/olivere/elastic" "github.com/sirupsen/logrus" ) @@ -47,6 +48,7 @@ type Mailroom struct { DB *sqlx.DB RP *redis.Pool ElasticClient *elastic.Client + FCMClient *fcm.Client Storage storage.Storage Quit chan bool @@ -192,6 +194,14 @@ func (mr *Mailroom) Start() error { log.Info("elastic ok") } + // initialize our FCM client + mr.FCMClient, err = fcm.NewClient(mr.Config.FCMKey) + if err != nil { + log.WithError(err).Error("unable to create FCM client, check configuration") + } else { + log.Info("firebase cloud messaging ok") + } + for _, initFunc := range initFunctions { initFunc(mr) } @@ -208,7 +218,7 @@ func (mr *Mailroom) Start() error { mr.handlerForeman.Start() // start our web server - mr.webserver = web.NewServer(mr.CTX, mr.Config, mr.DB, mr.RP, mr.Storage, mr.ElasticClient, mr.WaitGroup) + mr.webserver = web.NewServer(mr.CTX, mr.Config, mr.DB, mr.RP, mr.Storage, mr.ElasticClient, mr.FCMClient, mr.WaitGroup) mr.webserver.Start() logrus.Info("mailroom started") diff --git a/services/tickets/mailgun/web.go b/services/tickets/mailgun/web.go index c56c817cf..db07bbd10 100644 --- a/services/tickets/mailgun/web.go +++ b/services/tickets/mailgun/web.go @@ -123,7 +123,7 @@ func handleReceive(ctx context.Context, s *web.Server, r *http.Request, l *model return errors.Wrapf(err, "error updating ticket: %s", ticket.UUID()), http.StatusInternalServerError, nil } - msg, err := tickets.SendReply(ctx, s.DB, s.RP, s.Storage, ticket, request.StrippedText, files) + msg, err := tickets.SendReply(ctx, s.DB, s.RP, s.FCMClient, s.Storage, ticket, request.StrippedText, files) if err != nil { return err, http.StatusInternalServerError, nil } diff --git a/services/tickets/rocketchat/web.go b/services/tickets/rocketchat/web.go index 1b2e0d11c..83d716b1b 100644 --- a/services/tickets/rocketchat/web.go +++ b/services/tickets/rocketchat/web.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "net/http" + "github.com/go-chi/chi" "github.com/nyaruka/goflow/assets" "github.com/nyaruka/goflow/flows" @@ -12,7 +14,6 @@ import ( "github.com/nyaruka/mailroom/services/tickets" "github.com/nyaruka/mailroom/web" "github.com/pkg/errors" - "net/http" ) func init() { @@ -88,7 +89,7 @@ func handleEventCallback(ctx context.Context, s *web.Server, r *http.Request, l attachments = append(attachments, attachment.URL) } - _, err = tickets.SendReply(ctx, s.DB, s.RP, s.Storage, ticket, data.Text, files) + _, err = tickets.SendReply(ctx, s.DB, s.RP, s.FCMClient, s.Storage, ticket, data.Text, files) case "close-room": err = models.CloseTickets(ctx, s.DB, nil, []*models.Ticket{ticket}, false, l) diff --git a/services/tickets/utils.go b/services/tickets/utils.go index 4aadb9c34..1f1217e10 100644 --- a/services/tickets/utils.go +++ b/services/tickets/utils.go @@ -10,6 +10,7 @@ import ( "path/filepath" "time" + "github.com/edganiukov/fcm" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/storage" "github.com/nyaruka/gocommon/uuids" @@ -80,7 +81,7 @@ func FromTicketerUUID(ctx context.Context, db *sqlx.DB, uuid assets.TicketerUUID } // SendReply sends a message reply from the ticket system user to the contact -func SendReply(ctx context.Context, db *sqlx.DB, rp *redis.Pool, store storage.Storage, ticket *models.Ticket, text string, files []*File) (*models.Msg, error) { +func SendReply(ctx context.Context, db *sqlx.DB, rp *redis.Pool, fc *fcm.Client, store storage.Storage, ticket *models.Ticket, text string, files []*File) (*models.Msg, error) { // look up our assets oa, err := models.GetOrgAssets(ctx, db, ticket.OrgID()) if err != nil { @@ -110,7 +111,7 @@ func SendReply(ctx context.Context, db *sqlx.DB, rp *redis.Pool, store storage.S return nil, errors.Wrapf(err, "error creating message batch") } - msgio.SendMessages(ctx, db, rp, msgs) + msgio.SendMessages(ctx, db, rp, fc, msgs) return msgs[0], nil } diff --git a/services/tickets/utils_test.go b/services/tickets/utils_test.go index 97feebb4e..6be474c8d 100644 --- a/services/tickets/utils_test.go +++ b/services/tickets/utils_test.go @@ -138,7 +138,7 @@ func TestSendReply(t *testing.T) { ticket, err := models.LookupTicketByUUID(ctx, db, ticketUUID) require.NoError(t, err) - msg, err := tickets.SendReply(ctx, db, rp, testsuite.Storage(), ticket, "I'll get back to you", []*tickets.File{image}) + msg, err := tickets.SendReply(ctx, db, rp, nil, testsuite.Storage(), ticket, "I'll get back to you", []*tickets.File{image}) require.NoError(t, err) assert.Equal(t, "I'll get back to you", msg.Text()) @@ -147,6 +147,6 @@ func TestSendReply(t *testing.T) { assert.FileExists(t, "_test_storage/media/1/1ae9/6956/1ae96956-4b34-433e-8d1a-f05fe6923d6d.jpg") // try with file that can't be read (i.e. same file again which is already closed) - _, err = tickets.SendReply(ctx, db, rp, testsuite.Storage(), ticket, "I'll get back to you", []*tickets.File{image}) + _, err = tickets.SendReply(ctx, db, rp, nil, testsuite.Storage(), ticket, "I'll get back to you", []*tickets.File{image}) assert.EqualError(t, err, "error storing attachment http://coolfiles.com/a.jpg for ticket reply: unable to read attachment content: read ../../core/models/testdata/test.jpg: file already closed") } diff --git a/services/tickets/zendesk/web.go b/services/tickets/zendesk/web.go index 46afdaf36..e6f4f8b99 100644 --- a/services/tickets/zendesk/web.go +++ b/services/tickets/zendesk/web.go @@ -85,7 +85,7 @@ func handleChannelback(ctx context.Context, s *web.Server, r *http.Request) (int } } - msg, err := tickets.SendReply(ctx, s.DB, s.RP, s.Storage, ticket, request.Message, files) + msg, err := tickets.SendReply(ctx, s.DB, s.RP, s.FCMClient, s.Storage, ticket, request.Message, files) if err != nil { return err, http.StatusBadRequest, nil } diff --git a/web/contact/contact.go b/web/contact/contact.go index 9c9c2b397..80284c515 100644 --- a/web/contact/contact.go +++ b/web/contact/contact.go @@ -65,7 +65,7 @@ func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interfac } modifiersByContact := map[*flows.Contact][]flows.Modifier{contact: c.Mods} - _, err = models.ApplyModifiers(ctx, s.DB, s.RP, oa, modifiersByContact) + _, err = models.ApplyModifiers(ctx, s.DB, s.RP, s.FCMClient, oa, modifiersByContact) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "error modifying new contact") } @@ -153,7 +153,7 @@ func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interfac modifiersByContact[flowContact] = mods } - eventsByContact, err := models.ApplyModifiers(ctx, s.DB, s.RP, oa, modifiersByContact) + eventsByContact, err := models.ApplyModifiers(ctx, s.DB, s.RP, s.FCMClient, oa, modifiersByContact) if err != nil { return nil, http.StatusBadRequest, err } diff --git a/web/contact/search_test.go b/web/contact/search_test.go index 12979307f..6485281a0 100644 --- a/web/contact/search_test.go +++ b/web/contact/search_test.go @@ -39,7 +39,7 @@ func TestSearch(t *testing.T) { ) assert.NoError(t, err) - server := web.NewServer(ctx, config.Mailroom, db, rp, nil, client, wg) + server := web.NewServer(ctx, config.Mailroom, db, rp, nil, client, nil, wg) server.Start() // give our server time to start diff --git a/web/ivr/ivr.go b/web/ivr/ivr.go index 93eceae77..0643b4aac 100644 --- a/web/ivr/ivr.go +++ b/web/ivr/ivr.go @@ -136,7 +136,7 @@ func handleIncomingCall(ctx context.Context, s *web.Server, r *http.Request, w h } // try to handle this event - session, err := handler.HandleChannelEvent(ctx, s.DB, s.RP, models.MOCallEventType, event, conn) + session, err := handler.HandleChannelEvent(ctx, s.DB, s.RP, s.FCMClient, models.MOCallEventType, event, conn) if err != nil { logrus.WithError(err).WithField("http_request", r).Error("error handling incoming call") @@ -166,7 +166,7 @@ func handleIncomingCall(ctx context.Context, s *web.Server, r *http.Request, w h } // try to handle it, this time looking for a missed call event - session, err = handler.HandleChannelEvent(ctx, s.DB, s.RP, models.MOMissEventType, event, nil) + session, err = handler.HandleChannelEvent(ctx, s.DB, s.RP, s.FCMClient, models.MOMissEventType, event, nil) if err != nil { logrus.WithError(err).WithField("http_request", r).Error("error handling missed call") return channel, conn, client.WriteErrorResponse(w, errors.Wrapf(err, "error handling missed call")) @@ -290,14 +290,14 @@ func handleFlow(ctx context.Context, s *web.Server, r *http.Request, w http.Resp switch request.Action { case actionStart: err = ivr.StartIVRFlow( - ctx, s.DB, s.RP, client, resumeURL, + ctx, s.DB, s.RP, s.FCMClient, client, resumeURL, oa, channel, conn, contacts[0], urn, conn.StartID(), r, w, ) case actionResume: err = ivr.ResumeIVRFlow( - ctx, s.Config, s.DB, s.RP, s.Storage, resumeURL, client, + ctx, s.Config, s.DB, s.RP, s.FCMClient, s.Storage, resumeURL, client, oa, channel, conn, contacts[0], urn, r, w, ) diff --git a/web/ivr/ivr_test.go b/web/ivr/ivr_test.go index be2e73f2b..1274136ca 100644 --- a/web/ivr/ivr_test.go +++ b/web/ivr/ivr_test.go @@ -60,7 +60,7 @@ func TestTwilioIVR(t *testing.T) { twiml.IgnoreSignatures = true wg := &sync.WaitGroup{} - server := web.NewServer(ctx, config.Mailroom, db, rp, testsuite.Storage(), nil, wg) + server := web.NewServer(ctx, config.Mailroom, db, rp, testsuite.Storage(), nil, nil, wg) server.Start() defer server.Stop() @@ -365,7 +365,7 @@ func TestNexmoIVR(t *testing.T) { defer ts.Close() wg := &sync.WaitGroup{} - server := web.NewServer(ctx, config.Mailroom, db, rp, testsuite.Storage(), nil, wg) + server := web.NewServer(ctx, config.Mailroom, db, rp, testsuite.Storage(), nil, nil, wg) server.Start() defer server.Stop() diff --git a/web/org/metrics_test.go b/web/org/metrics_test.go index 68d9caf6a..8080a87f0 100644 --- a/web/org/metrics_test.go +++ b/web/org/metrics_test.go @@ -25,7 +25,7 @@ func TestMetrics(t *testing.T) { db.MustExec(`INSERT INTO api_apitoken(is_active, org_id, created, key, role_id, user_id) VALUES(TRUE, $1, NOW(), $2, 8, 1);`, models.Org1, adminToken) wg := &sync.WaitGroup{} - server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, wg) + server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, nil, wg) server.Start() // wait for the server to start diff --git a/web/server.go b/web/server.go index 21db50dfd..25f8f56b4 100644 --- a/web/server.go +++ b/web/server.go @@ -13,6 +13,7 @@ import ( "github.com/nyaruka/gocommon/storage" "github.com/nyaruka/mailroom/config" + "github.com/edganiukov/fcm" "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/gomodule/redigo/redis" @@ -61,14 +62,16 @@ func RegisterRoute(method string, pattern string, handler Handler) { } // NewServer creates a new web server, it will need to be started after being created -func NewServer(ctx context.Context, config *config.Config, db *sqlx.DB, rp *redis.Pool, store storage.Storage, elasticClient *elastic.Client, wg *sync.WaitGroup) *Server { +func NewServer(ctx context.Context, config *config.Config, db *sqlx.DB, rp *redis.Pool, store storage.Storage, elasticClient *elastic.Client, fc *fcm.Client, wg *sync.WaitGroup) *Server { s := &Server{ CTX: ctx, RP: rp, DB: db, Storage: store, ElasticClient: elasticClient, - Config: config, + FCMClient: fc, + + Config: config, wg: wg, } @@ -216,6 +219,7 @@ type Server struct { Storage storage.Storage Config *config.Config ElasticClient *elastic.Client + FCMClient *fcm.Client wg *sync.WaitGroup diff --git a/web/simulation/simulation_test.go b/web/simulation/simulation_test.go index 8c256febc..7d81a065b 100644 --- a/web/simulation/simulation_test.go +++ b/web/simulation/simulation_test.go @@ -255,7 +255,7 @@ func TestServer(t *testing.T) { rp := testsuite.RP() wg := &sync.WaitGroup{} - server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, wg) + server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, nil, wg) server.Start() // give our server time to start diff --git a/web/surveyor/surveyor.go b/web/surveyor/surveyor.go index 9f846fa51..6629b9120 100644 --- a/web/surveyor/surveyor.go +++ b/web/surveyor/surveyor.go @@ -133,7 +133,7 @@ func handleSubmit(ctx context.Context, s *web.Server, r *http.Request) (interfac if err != nil { return nil, http.StatusInternalServerError, errors.Wrapf(err, "error starting transaction for session write") } - sessions, err := models.WriteSessions(ctx, tx, s.RP, oa, []flows.Session{fs}, []flows.Sprint{sprint}, nil) + sessions, err := models.WriteSessions(ctx, tx, s.RP, nil, oa, []flows.Session{fs}, []flows.Sprint{sprint}, nil) if err == nil && len(sessions) == 0 { err = errors.Errorf("no sessions written") } @@ -152,7 +152,7 @@ func handleSubmit(ctx context.Context, s *web.Server, r *http.Request) (interfac } // write our post commit hooks - err = models.ApplyEventPostCommitHooks(ctx, tx, s.RP, oa, []*models.Scene{sessions[0].Scene()}) + err = models.ApplyEventPostCommitHooks(ctx, tx, s.RP, nil, oa, []*models.Scene{sessions[0].Scene()}) if err != nil { tx.Rollback() return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying post commit hooks") diff --git a/web/surveyor/surveyor_test.go b/web/surveyor/surveyor_test.go index 6060beeca..d2815951e 100644 --- a/web/surveyor/surveyor_test.go +++ b/web/surveyor/surveyor_test.go @@ -27,7 +27,7 @@ func TestSurveyor(t *testing.T) { defer rc.Close() wg := &sync.WaitGroup{} - server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, wg) + server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, nil, wg) server.Start() defer server.Stop() diff --git a/web/testing.go b/web/testing.go index 1073d3635..6eb728bf8 100644 --- a/web/testing.go +++ b/web/testing.go @@ -40,7 +40,7 @@ func RunWebTests(t *testing.T, truthFile string) { defer testsuite.ResetStorage() - server := NewServer(context.Background(), config.Mailroom, db, rp, testsuite.Storage(), nil, wg) + server := NewServer(context.Background(), config.Mailroom, db, rp, testsuite.Storage(), nil, nil, wg) server.Start() defer server.Stop() From 4a36e5a3dcd94608dc71521a29e216c377fef47d Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 14 Jan 2021 11:21:08 -0500 Subject: [PATCH 2/2] Don't pass redis pool or FCM client to ApplyModifiers which shouldn't ever send messages --- core/models/events.go | 4 ++-- core/models/imports.go | 2 +- web/contact/contact.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/models/events.go b/core/models/events.go index 94d780620..c31baddbd 100644 --- a/core/models/events.go +++ b/core/models/events.go @@ -238,7 +238,7 @@ func HandleAndCommitEvents(ctx context.Context, db QueryerWithTx, rp *redis.Pool } // ApplyModifiers modifies contacts by applying modifiers and handling the resultant events -func ApplyModifiers(ctx context.Context, db QueryerWithTx, rp *redis.Pool, fc *fcm.Client, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { +func ApplyModifiers(ctx context.Context, db QueryerWithTx, oa *OrgAssets, modifiersByContact map[*flows.Contact][]flows.Modifier) (map[*flows.Contact][]flows.Event, error) { // create an environment instance with location support env := flows.NewEnvironment(oa.Env(), oa.SessionAssets().Locations()) @@ -253,7 +253,7 @@ func ApplyModifiers(ctx context.Context, db QueryerWithTx, rp *redis.Pool, fc *f eventsByContact[contact] = events } - err := HandleAndCommitEvents(ctx, db, rp, fc, oa, eventsByContact) + err := HandleAndCommitEvents(ctx, db, nil, nil, oa, eventsByContact) if err != nil { return nil, errors.Wrap(err, "error commiting events") } diff --git a/core/models/imports.go b/core/models/imports.go index 2d979e390..d8cb955cc 100644 --- a/core/models/imports.go +++ b/core/models/imports.go @@ -113,7 +113,7 @@ func (b *ContactImportBatch) tryImport(ctx context.Context, db *sqlx.DB, orgID O } // and apply in bulk - _, err = ApplyModifiers(ctx, db, nil, nil, oa, modifiersByContact) + _, err = ApplyModifiers(ctx, db, oa, modifiersByContact) if err != nil { return errors.Wrap(err, "error applying modifiers") } diff --git a/web/contact/contact.go b/web/contact/contact.go index 80284c515..c604c9563 100644 --- a/web/contact/contact.go +++ b/web/contact/contact.go @@ -65,7 +65,7 @@ func handleCreate(ctx context.Context, s *web.Server, r *http.Request) (interfac } modifiersByContact := map[*flows.Contact][]flows.Modifier{contact: c.Mods} - _, err = models.ApplyModifiers(ctx, s.DB, s.RP, s.FCMClient, oa, modifiersByContact) + _, err = models.ApplyModifiers(ctx, s.DB, oa, modifiersByContact) if err != nil { return nil, http.StatusInternalServerError, errors.Wrap(err, "error modifying new contact") } @@ -153,7 +153,7 @@ func handleModify(ctx context.Context, s *web.Server, r *http.Request) (interfac modifiersByContact[flowContact] = mods } - eventsByContact, err := models.ApplyModifiers(ctx, s.DB, s.RP, s.FCMClient, oa, modifiersByContact) + eventsByContact, err := models.ApplyModifiers(ctx, s.DB, oa, modifiersByContact) if err != nil { return nil, http.StatusBadRequest, err }