Skip to content

Commit

Permalink
Merge pull request #397 from nyaruka/fcm_client_refactor
Browse files Browse the repository at this point in the history
FCM client refactor
  • Loading branch information
rowanseymour authored Jan 14, 2021
2 parents 4b23bc0 + 4a36e5a commit bfc9340
Show file tree
Hide file tree
Showing 57 changed files with 187 additions and 164 deletions.
6 changes: 3 additions & 3 deletions core/handlers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) {
return triggers.NewBuilder(oa.Env(), testFlow.Reference(), contact).Msg(msg).Build()
}

_, err = runner.StartFlow(ctx, db, rp, oa, flow.(*models.Flow), []models.ContactID{models.CathyID, models.BobID, models.GeorgeID, models.AlexandriaID}, options)
_, err = runner.StartFlow(ctx, db, rp, nil, oa, flow.(*models.Flow), []models.ContactID{models.CathyID, models.BobID, models.GeorgeID, models.AlexandriaID}, options)
assert.NoError(t, err)

results := make(map[models.ContactID]modifyResult)
Expand Down Expand Up @@ -240,7 +240,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) {
assert.NoError(t, err)
}

err = models.ApplyEventPreCommitHooks(ctx, tx, rp, oa, scenes)
err = models.ApplyEventPreCommitHooks(ctx, tx, rp, nil, oa, scenes)
assert.NoError(t, err)

err = tx.Commit()
Expand All @@ -249,7 +249,7 @@ func RunTestCases(t *testing.T, tcs []TestCase) {
tx, err = db.BeginTxx(ctx, nil)
assert.NoError(t, err)

err = models.ApplyEventPostCommitHooks(ctx, tx, rp, oa, scenes)
err = models.ApplyEventPostCommitHooks(ctx, tx, rp, nil, oa, scenes)
assert.NoError(t, err)

err = tx.Commit()
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/commit_added_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
)
Expand All @@ -16,7 +17,7 @@ var CommitAddedLabelsHook models.EventCommitHook = &commitAddedLabelsHook{}
type commitAddedLabelsHook struct{}

// Apply applies our input labels added, committing them in a single batch
func (h *commitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitAddedLabelsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// build our list of msg label adds, we dedupe these so we never double add in the same transaction
seen := make(map[string]bool)
adds := make([]*models.MsgLabelAdd, 0, len(scenes))
Expand Down
8 changes: 5 additions & 3 deletions core/hooks/commit_field_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand All @@ -20,7 +22,7 @@ var CommitFieldChangesHook models.EventCommitHook = &commitFieldChangesHook{}
type commitFieldChangesHook struct{}

// Apply squashes and writes all the field updates for the contacts
func (h *commitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitFieldChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// our list of updates
fieldUpdates := make([]interface{}, 0, len(scenes))
fieldDeletes := make(map[assets.FieldUUID][]interface{})
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/commit_group_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var CommitGroupChangesHook models.EventCommitHook = &commitGroupChangesHook{}
type commitGroupChangesHook struct{}

// Apply squashes and adds or removes all our contact groups
func (h *commitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitGroupChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// build up our list of all adds and removes
adds := make([]*models.GroupAdd, 0, len(scenes))
removes := make([]*models.GroupRemove, 0, len(scenes))
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/commit_ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var CommitIVRHook models.EventCommitHook = &commitIVRHook{}
type commitIVRHook struct{}

// Apply takes care of inserting all the messages in the passed in scene assigning topups to them as needed.
func (h *commitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitIVRHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
msgs := make([]*models.Msg, 0, len(scenes))
for _, s := range scenes {
for _, m := range s {
Expand Down
8 changes: 5 additions & 3 deletions core/hooks/commit_language_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package hooks
import (
"context"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/null"
)

Expand All @@ -16,7 +18,7 @@ var CommitLanguageChangesHook models.EventCommitHook = &commitLanguageChangesHoo
type commitLanguageChangesHook struct{}

// Apply applies our contact language change before our commit
func (h *commitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitLanguageChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// build up our list of pairs of contact id and language name
updates := make([]interface{}, 0, len(scenes))
for s, e := range scenes {
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/commit_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var CommitMessagesHook models.EventCommitHook = &commitMessagesHook{}
type commitMessagesHook struct{}

// Apply takes care of inserting all the messages in the passed in scene assigning topups to them as needed.
func (h *commitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
msgs := make([]*models.Msg, 0, len(scenes))
for _, s := range scenes {
for _, m := range s {
Expand Down
8 changes: 5 additions & 3 deletions core/hooks/commit_name_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/null"
)

Expand All @@ -17,7 +19,7 @@ var CommitNameChangesHook models.EventCommitHook = &commitNameChangesHook{}
type commitNameChangesHook struct{}

// Apply commits our contact name changes as a bulk update for the passed in map of scene
func (h *commitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitNameChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// build up our list of pairs of contact id and contact name
updates := make([]interface{}, 0, len(scenes))
for s, e := range scenes {
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/commit_status_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -17,7 +18,7 @@ var CommitStatusChangesHook models.EventCommitHook = &commitStatusChangesHook{}
type commitStatusChangesHook struct{}

// Apply commits our contact status change
func (h *commitStatusChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitStatusChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {

statusChanges := make([]*models.ContactStatusChange, 0, len(scenes))
for scene, es := range scenes {
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/commit_urn_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var CommitURNChangesHook models.EventCommitHook = &commitURNChangesHook{}
type commitURNChangesHook struct{}

// Apply adds all our URNS in a batch
func (h *commitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *commitURNChangesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// gather all our urn changes, we only care about the last change for each scene
changes := make([]*models.ContactURNsChanged, 0, len(scenes))
for _, sessionChanges := range scenes {
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/contact_last_seen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -17,7 +18,7 @@ var ContactLastSeenHook models.EventCommitHook = &contactLastSeenHook{}
type contactLastSeenHook struct{}

// Apply squashes and updates modified_on on all the contacts passed in
func (h *contactLastSeenHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *contactLastSeenHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {

for scene, evts := range scenes {
lastEvent := evts[len(evts)-1].(flows.Event)
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/contact_modified.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var ContactModifiedHook models.EventCommitHook = &contactModifiedHook{}
type contactModifiedHook struct{}

// Apply squashes and updates modified_on on all the contacts passed in
func (h *contactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *contactModifiedHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// our lists of contact ids
contactIDs := make([]models.ContactID, 0, len(scenes))

Expand Down
3 changes: 2 additions & 1 deletion core/hooks/insert_airtime_transfers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var InsertAirtimeTransfersHook models.EventCommitHook = &insertAirtimeTransfersH
type insertAirtimeTransfersHook struct{}

// Apply inserts all the airtime transfers that were created
func (h *insertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *insertAirtimeTransfersHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// gather all our transfers
transfers := make([]*models.AirtimeTransfer, 0, len(scenes))

Expand Down
3 changes: 2 additions & 1 deletion core/hooks/insert_http_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var InsertHTTPLogsHook models.EventCommitHook = &insertHTTPLogsHook{}
type insertHTTPLogsHook struct{}

// Apply inserts all the classifier logs that were created
func (h *insertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *insertHTTPLogsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// gather all our logs
logs := make([]*models.HTTPLog, 0, len(scenes))
for _, ls := range scenes {
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/insert_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -18,7 +19,7 @@ var InsertStartHook models.EventCommitHook = &insertStartHook{}
type insertStartHook struct{}

// Apply inserts our starts
func (h *insertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *insertStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
rc := rp.Get()
defer rc.Close()

Expand Down
3 changes: 2 additions & 1 deletion core/hooks/insert_tickets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var InsertTicketsHook models.EventCommitHook = &insertTicketsHook{}
type insertTicketsHook struct{}

// Apply inserts all the airtime transfers that were created
func (h *insertTicketsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *insertTicketsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// gather all our tickets
tickets := make([]*models.Ticket, 0, len(scenes))

Expand Down
3 changes: 2 additions & 1 deletion core/hooks/insert_webhook_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var InsertWebhookEventHook models.EventCommitHook = &insertWebhookEventHook{}
type insertWebhookEventHook struct{}

// Apply inserts all the webook events that were created
func (h *insertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *insertWebhookEventHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
events := make([]*models.WebhookEvent, 0, len(scenes))
for _, rs := range scenes {
for _, r := range rs {
Expand Down
3 changes: 2 additions & 1 deletion core/hooks/insert_webhook_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/nyaruka/mailroom/core/models"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -16,7 +17,7 @@ var InsertWebhookResultHook models.EventCommitHook = &insertWebhookResultHook{}
type insertWebhookResultHook struct{}

// Apply inserts all the webook results that were created
func (h *insertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *insertWebhookResultHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
// gather all our results
results := make([]*models.WebhookResult, 0, len(scenes))
for _, rs := range scenes {
Expand Down
5 changes: 3 additions & 2 deletions core/hooks/send_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/msgio"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
)
Expand All @@ -16,7 +17,7 @@ var SendMessagesHook models.EventCommitHook = &sendMessagesHook{}
type sendMessagesHook struct{}

// Apply sends all non-android messages to courier
func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
msgs := make([]*models.Msg, 0, 1)

// for each scene gather all our messages
Expand All @@ -35,6 +36,6 @@ func (h *sendMessagesHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Poo
msgs = append(msgs, sceneMsgs...)
}

msgio.SendMessages(ctx, tx, rp, msgs)
msgio.SendMessages(ctx, tx, rp, fc, msgs)
return nil
}
3 changes: 2 additions & 1 deletion core/hooks/start_broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -18,7 +19,7 @@ var StartBroadcastsHook models.EventCommitHook = &startBroadcastsHook{}
type startBroadcastsHook struct{}

// Apply queues up our broadcasts for sending
func (h *startBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *startBroadcastsHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
rc := rp.Get()
defer rc.Close()

Expand Down
3 changes: 2 additions & 1 deletion core/hooks/start_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"

"github.com/edganiukov/fcm"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
Expand All @@ -17,7 +18,7 @@ var StartStartHook models.EventCommitHook = &startStartHook{}
type startStartHook struct{}

// Apply queues up our flow starts
func (h *startStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
func (h *startStartHook) Apply(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, fc *fcm.Client, oa *models.OrgAssets, scenes map[*models.Scene][]interface{}) error {
rc := rp.Get()
defer rc.Close()

Expand Down
Loading

0 comments on commit bfc9340

Please sign in to comment.