Skip to content

Commit

Permalink
Merge pull request #617 from nyaruka/flow_history_changes
Browse files Browse the repository at this point in the history
Update modified_on whenever flow history changes
  • Loading branch information
rowanseymour authored Apr 11, 2022
2 parents 57d3991 + 962e373 commit 2a22e0e
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 64 deletions.
28 changes: 0 additions & 28 deletions core/handlers/contact_flow_changed.go

This file was deleted.

7 changes: 2 additions & 5 deletions core/handlers/contact_status_changed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import (
)

func TestContactStatusChanged(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()
ctx, rt, _, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetAll)

// make sure cathyID contact is active
db.Exec(`UPDATE contacts_contact SET status = 'A' WHERE id = $1`, testdata.Cathy.ID)
defer testsuite.Reset(testsuite.ResetData)

tcs := []handlers.TestCase{
{
Expand Down
9 changes: 0 additions & 9 deletions core/handlers/contact_urns_changed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package handlers_test

import (
"testing"
"time"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/flows"
Expand All @@ -20,8 +19,6 @@ func TestContactURNsChanged(t *testing.T) {
// add a URN to george that cathy will steal
testdata.InsertContactURN(db, testdata.Org1, testdata.George, urns.URN("tel:+12065551212"), 100)

now := time.Now()

tcs := []handlers.TestCase{
{
Actions: handlers.ContactActionMap{
Expand Down Expand Up @@ -55,12 +52,6 @@ func TestContactURNsChanged(t *testing.T) {
Args: []interface{}{testdata.George.ID},
Count: 1,
},
// two contacts updated, both cathy and evan since their URNs changed
{
SQL: "select count(*) from contacts_contact where modified_on > $1",
Args: []interface{}{now},
Count: 2,
},
},
},
}
Expand Down
35 changes: 35 additions & 0 deletions core/handlers/flow_entered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package handlers

import (
"context"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/mailroom/core/hooks"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"

"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)

func init() {
models.RegisterEventHandler(events.TypeFlowEntered, handleFlowEntered)
}

func handleFlowEntered(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.FlowEnteredEvent)

logrus.WithFields(logrus.Fields{
"contact_uuid": scene.ContactUUID(),
"session_id": scene.SessionID(),
"flow_name": event.Flow.Name,
"flow_uuid": event.Flow.UUID,
}).Debug("flow entered")

// we've potentially changed contact flow history.. only way to be sure would be loading contacts with their
// flow history, but not sure that is worth it given how likely we are to be updating modified_on anyway
scene.AppendToEventPreCommitHook(hooks.ContactModifiedHook, event)

return nil
}
42 changes: 42 additions & 0 deletions core/handlers/flow_entered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package handlers_test

import (
"testing"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/actions"
"github.com/nyaruka/mailroom/core/handlers"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
"github.com/stretchr/testify/assert"
)

func TestFlowEntered(t *testing.T) {
ctx, rt, _, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetAll)

oa := testdata.Org1.Load(rt)

flow, err := oa.FlowByID(testdata.PickANumber.ID)
assert.NoError(t, err)

tcs := []handlers.TestCase{
{
Actions: handlers.ContactActionMap{
testdata.Cathy: []flows.Action{
actions.NewEnterFlow(handlers.NewActionUUID(), flow.Reference(), false),
},
},
SQLAssertions: []handlers.SQLAssertion{
{
SQL: `select count(*) from contacts_contact where current_flow_id = $1`,
Args: []interface{}{flow.ID()},
Count: 1,
},
},
},
}

handlers.RunTestCases(t, ctx, rt, tcs)
}
1 change: 0 additions & 1 deletion core/handlers/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func init() {
models.RegisterEventHandler(events.TypeEnvironmentRefreshed, NoopHandler)
models.RegisterEventHandler(events.TypeError, NoopHandler)
models.RegisterEventHandler(events.TypeFailure, NoopHandler)
models.RegisterEventHandler(events.TypeFlowEntered, NoopHandler)
models.RegisterEventHandler(events.TypeMsgWait, NoopHandler)
models.RegisterEventHandler(events.TypeRunExpired, NoopHandler)
models.RegisterEventHandler(events.TypeRunResultChanged, NoopHandler)
Expand Down
5 changes: 2 additions & 3 deletions core/handlers/session_triggered.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ func init() {
models.RegisterEventHandler(events.TypeSessionTriggered, handleSessionTriggered)
}

// handleSessionTriggered queues this event for being started after our scene are committed
func handleSessionTriggered(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*events.SessionTriggeredEvent)

logrus.WithFields(logrus.Fields{
"contact_uuid": scene.ContactUUID(),
"session_id": scene.SessionID(),
"flow": event.Flow.Name,
"flow_name": event.Flow.Name,
"flow_uuid": event.Flow.UUID,
}).Debug("scene triggered")
}).Debug("session triggered")

scene.AppendToEventPreCommitHook(hooks.InsertStartHook, event)

Expand Down
35 changes: 35 additions & 0 deletions core/handlers/sprint_ended.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package handlers

import (
"context"

"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom/core/hooks"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"

"github.com/jmoiron/sqlx"
)

func init() {
models.RegisterEventHandler(models.TypeSprintEnded, handleSprintEnded)
}

func handleSprintEnded(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error {
event := e.(*models.SprintEndedEvent)

// if we're in a flow type that can wait then contact current flow has potentially changed
currentFlowChanged := scene.Session().SessionType().Interrupts() && event.Contact.CurrentFlowID() != scene.Session().CurrentFlowID()

if currentFlowChanged {
scene.AppendToEventPreCommitHook(hooks.CommitFlowChangesHook, scene.Session().CurrentFlowID())
}

// if current flow has changed then we need to update modified_on, but also if this is a new session
// then flow history may have changed too in a way that won't be captured by a flow_entered event
if currentFlowChanged || !event.Resumed {
scene.AppendToEventPostCommitHook(hooks.ContactModifiedHook, event)
}

return nil
}
4 changes: 2 additions & 2 deletions core/hooks/commit_flow_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func (h *commitFlowChangesHook) Apply(ctx context.Context, rt *runtime.Runtime,
updates := make([]interface{}, 0, len(scenes))
for s, evts := range scenes {
// there is only ever one of these events per scene
event := evts[len(evts)-1].(*models.ContactFlowChangedEvent)
updates = append(updates, &currentFlowUpdate{s.ContactID(), event.FlowID})
flowID := evts[len(evts)-1].(models.FlowID)
updates = append(updates, &currentFlowUpdate{s.ContactID(), flowID})
}

// do our update
Expand Down
19 changes: 11 additions & 8 deletions core/models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,18 +260,21 @@ func ApplyModifiers(ctx context.Context, rt *runtime.Runtime, oa *OrgAssets, mod
return eventsByContact, nil
}

// TypeContactFlowChanged is the type of our event that the contact flow changed
const TypeContactFlowChanged string = "contact_flow_changed"
// TypeSprintEnded is a pseudo event that lets add hooks for changes to a contacts current flow or flow history
const TypeSprintEnded string = "sprint_ended"

type ContactFlowChangedEvent struct {
type SprintEndedEvent struct {
events.BaseEvent

FlowID FlowID
Contact *Contact // model contact so we can access current flow
Resumed bool // whether this was a resume
}

func NewContactFlowChangedEvent(flowID FlowID) *ContactFlowChangedEvent {
return &ContactFlowChangedEvent{
BaseEvent: events.NewBaseEvent(TypeContactFlowChanged),
FlowID: flowID,
// NewSprintEndedEvent creates a new sprint ended event
func NewSprintEndedEvent(c *Contact, resumed bool) *SprintEndedEvent {
return &SprintEndedEvent{
BaseEvent: events.NewBaseEvent(TypeSprintEnded),
Contact: c,
Resumed: resumed,
}
}
10 changes: 2 additions & 8 deletions core/models/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,7 @@ func (s *Session) Update(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx,
eventsToHandle = append(eventsToHandle, sprint.Events()...)
}

// if contact's current flow has changed, add pseudo event to handle that
if s.SessionType().Interrupts() && contact.CurrentFlowID() != s.CurrentFlowID() {
eventsToHandle = append(eventsToHandle, NewContactFlowChangedEvent(s.CurrentFlowID()))
}
eventsToHandle = append(eventsToHandle, NewSprintEndedEvent(contact, true))

// apply all our events to generate hooks
err = HandleEvents(ctx, rt, tx, oa, s.scene, eventsToHandle)
Expand Down Expand Up @@ -708,10 +705,7 @@ func InsertSessions(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *O
eventsToHandle = append(eventsToHandle, sprints[i].Events()...)
}

// if contact's current flow has changed, add pseudo event to handle that
if s.SessionType().Interrupts() && contacts[i].CurrentFlowID() != s.CurrentFlowID() {
eventsToHandle = append(eventsToHandle, NewContactFlowChangedEvent(s.CurrentFlowID()))
}
eventsToHandle = append(eventsToHandle, NewSprintEndedEvent(contacts[i], false))

err = HandleEvents(ctx, rt, tx, oa, s.Scene(), eventsToHandle)
if err != nil {
Expand Down

0 comments on commit 2a22e0e

Please sign in to comment.