diff --git a/core/handlers/contact_flow_changed.go b/core/handlers/contact_flow_changed.go deleted file mode 100644 index 3a5ef9667..000000000 --- a/core/handlers/contact_flow_changed.go +++ /dev/null @@ -1,28 +0,0 @@ -package handlers - -import ( - "context" - - "github.com/jmoiron/sqlx" - "github.com/nyaruka/goflow/flows" - "github.com/nyaruka/mailroom/core/hooks" - "github.com/nyaruka/mailroom/core/models" - "github.com/nyaruka/mailroom/runtime" - "github.com/sirupsen/logrus" -) - -func init() { - models.RegisterEventHandler(models.TypeContactFlowChanged, handleContactFlowChanged) -} - -// handleContactFlowChanged handles contact_flow_changed events which the engine doesn't produce but we append to update a contact's current flow -func handleContactFlowChanged(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error { - event := e.(*models.ContactFlowChangedEvent) - - logrus.WithFields(logrus.Fields{"contact_uuid": scene.ContactUUID(), "session_id": scene.SessionID(), "flow_id": event.FlowID}).Debug("contact flow changed") - - scene.AppendToEventPreCommitHook(hooks.CommitFlowChangesHook, event) - scene.AppendToEventPostCommitHook(hooks.ContactModifiedHook, event) - - return nil -} diff --git a/core/handlers/contact_status_changed_test.go b/core/handlers/contact_status_changed_test.go index 93a77b600..05c7aad89 100644 --- a/core/handlers/contact_status_changed_test.go +++ b/core/handlers/contact_status_changed_test.go @@ -11,12 +11,9 @@ import ( ) func TestContactStatusChanged(t *testing.T) { - ctx, rt, db, _ := testsuite.Get() + ctx, rt, _, _ := testsuite.Get() - defer testsuite.Reset(testsuite.ResetAll) - - // make sure cathyID contact is active - db.Exec(`UPDATE contacts_contact SET status = 'A' WHERE id = $1`, testdata.Cathy.ID) + defer testsuite.Reset(testsuite.ResetData) tcs := []handlers.TestCase{ { diff --git a/core/handlers/contact_urns_changed_test.go b/core/handlers/contact_urns_changed_test.go index 6ccfc4054..3bd8bd021 100644 --- a/core/handlers/contact_urns_changed_test.go +++ b/core/handlers/contact_urns_changed_test.go @@ -2,7 +2,6 @@ package handlers_test import ( "testing" - "time" "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/flows" @@ -20,8 +19,6 @@ func TestContactURNsChanged(t *testing.T) { // add a URN to george that cathy will steal testdata.InsertContactURN(db, testdata.Org1, testdata.George, urns.URN("tel:+12065551212"), 100) - now := time.Now() - tcs := []handlers.TestCase{ { Actions: handlers.ContactActionMap{ @@ -55,12 +52,6 @@ func TestContactURNsChanged(t *testing.T) { Args: []interface{}{testdata.George.ID}, Count: 1, }, - // two contacts updated, both cathy and evan since their URNs changed - { - SQL: "select count(*) from contacts_contact where modified_on > $1", - Args: []interface{}{now}, - Count: 2, - }, }, }, } diff --git a/core/handlers/flow_entered.go b/core/handlers/flow_entered.go new file mode 100644 index 000000000..aae420089 --- /dev/null +++ b/core/handlers/flow_entered.go @@ -0,0 +1,35 @@ +package handlers + +import ( + "context" + + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/events" + "github.com/nyaruka/mailroom/core/hooks" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/runtime" + + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" +) + +func init() { + models.RegisterEventHandler(events.TypeFlowEntered, handleFlowEntered) +} + +func handleFlowEntered(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error { + event := e.(*events.FlowEnteredEvent) + + logrus.WithFields(logrus.Fields{ + "contact_uuid": scene.ContactUUID(), + "session_id": scene.SessionID(), + "flow_name": event.Flow.Name, + "flow_uuid": event.Flow.UUID, + }).Debug("flow entered") + + // we've potentially changed contact flow history.. only way to be sure would be loading contacts with their + // flow history, but not sure that is worth it given how likely we are to be updating modified_on anyway + scene.AppendToEventPreCommitHook(hooks.ContactModifiedHook, event) + + return nil +} diff --git a/core/handlers/flow_entered_test.go b/core/handlers/flow_entered_test.go new file mode 100644 index 000000000..f30f2fc42 --- /dev/null +++ b/core/handlers/flow_entered_test.go @@ -0,0 +1,42 @@ +package handlers_test + +import ( + "testing" + + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/actions" + "github.com/nyaruka/mailroom/core/handlers" + "github.com/nyaruka/mailroom/testsuite" + "github.com/nyaruka/mailroom/testsuite/testdata" + "github.com/stretchr/testify/assert" +) + +func TestFlowEntered(t *testing.T) { + ctx, rt, _, _ := testsuite.Get() + + defer testsuite.Reset(testsuite.ResetAll) + + oa := testdata.Org1.Load(rt) + + flow, err := oa.FlowByID(testdata.PickANumber.ID) + assert.NoError(t, err) + + tcs := []handlers.TestCase{ + { + Actions: handlers.ContactActionMap{ + testdata.Cathy: []flows.Action{ + actions.NewEnterFlow(handlers.NewActionUUID(), flow.Reference(), false), + }, + }, + SQLAssertions: []handlers.SQLAssertion{ + { + SQL: `select count(*) from contacts_contact where current_flow_id = $1`, + Args: []interface{}{flow.ID()}, + Count: 1, + }, + }, + }, + } + + handlers.RunTestCases(t, ctx, rt, tcs) +} diff --git a/core/handlers/noop.go b/core/handlers/noop.go index a2f46c6fd..28a1471a0 100644 --- a/core/handlers/noop.go +++ b/core/handlers/noop.go @@ -16,7 +16,6 @@ func init() { models.RegisterEventHandler(events.TypeEnvironmentRefreshed, NoopHandler) models.RegisterEventHandler(events.TypeError, NoopHandler) models.RegisterEventHandler(events.TypeFailure, NoopHandler) - models.RegisterEventHandler(events.TypeFlowEntered, NoopHandler) models.RegisterEventHandler(events.TypeMsgWait, NoopHandler) models.RegisterEventHandler(events.TypeRunExpired, NoopHandler) models.RegisterEventHandler(events.TypeRunResultChanged, NoopHandler) diff --git a/core/handlers/session_triggered.go b/core/handlers/session_triggered.go index f8fdce670..50dadb317 100644 --- a/core/handlers/session_triggered.go +++ b/core/handlers/session_triggered.go @@ -17,16 +17,15 @@ func init() { models.RegisterEventHandler(events.TypeSessionTriggered, handleSessionTriggered) } -// handleSessionTriggered queues this event for being started after our scene are committed func handleSessionTriggered(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error { event := e.(*events.SessionTriggeredEvent) logrus.WithFields(logrus.Fields{ "contact_uuid": scene.ContactUUID(), "session_id": scene.SessionID(), - "flow": event.Flow.Name, + "flow_name": event.Flow.Name, "flow_uuid": event.Flow.UUID, - }).Debug("scene triggered") + }).Debug("session triggered") scene.AppendToEventPreCommitHook(hooks.InsertStartHook, event) diff --git a/core/handlers/sprint_ended.go b/core/handlers/sprint_ended.go new file mode 100644 index 000000000..34ec40844 --- /dev/null +++ b/core/handlers/sprint_ended.go @@ -0,0 +1,35 @@ +package handlers + +import ( + "context" + + "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/mailroom/core/hooks" + "github.com/nyaruka/mailroom/core/models" + "github.com/nyaruka/mailroom/runtime" + + "github.com/jmoiron/sqlx" +) + +func init() { + models.RegisterEventHandler(models.TypeSprintEnded, handleSprintEnded) +} + +func handleSprintEnded(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *models.OrgAssets, scene *models.Scene, e flows.Event) error { + event := e.(*models.SprintEndedEvent) + + // if we're in a flow type that can wait then contact current flow has potentially changed + currentFlowChanged := scene.Session().SessionType().Interrupts() && event.Contact.CurrentFlowID() != scene.Session().CurrentFlowID() + + if currentFlowChanged { + scene.AppendToEventPreCommitHook(hooks.CommitFlowChangesHook, scene.Session().CurrentFlowID()) + } + + // if current flow has changed then we need to update modified_on, but also if this is a new session + // then flow history may have changed too in a way that won't be captured by a flow_entered event + if currentFlowChanged || !event.Resumed { + scene.AppendToEventPostCommitHook(hooks.ContactModifiedHook, event) + } + + return nil +} diff --git a/core/hooks/commit_flow_changes.go b/core/hooks/commit_flow_changes.go index 488c11428..a4335e4d2 100644 --- a/core/hooks/commit_flow_changes.go +++ b/core/hooks/commit_flow_changes.go @@ -19,8 +19,8 @@ func (h *commitFlowChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, updates := make([]interface{}, 0, len(scenes)) for s, evts := range scenes { // there is only ever one of these events per scene - event := evts[len(evts)-1].(*models.ContactFlowChangedEvent) - updates = append(updates, ¤tFlowUpdate{s.ContactID(), event.FlowID}) + flowID := evts[len(evts)-1].(models.FlowID) + updates = append(updates, ¤tFlowUpdate{s.ContactID(), flowID}) } // do our update diff --git a/core/models/events.go b/core/models/events.go index e92990d75..4a82bad7f 100644 --- a/core/models/events.go +++ b/core/models/events.go @@ -260,18 +260,21 @@ func ApplyModifiers(ctx context.Context, rt *runtime.Runtime, oa *OrgAssets, mod return eventsByContact, nil } -// TypeContactFlowChanged is the type of our event that the contact flow changed -const TypeContactFlowChanged string = "contact_flow_changed" +// TypeSprintEnded is a pseudo event that lets add hooks for changes to a contacts current flow or flow history +const TypeSprintEnded string = "sprint_ended" -type ContactFlowChangedEvent struct { +type SprintEndedEvent struct { events.BaseEvent - FlowID FlowID + Contact *Contact // model contact so we can access current flow + Resumed bool // whether this was a resume } -func NewContactFlowChangedEvent(flowID FlowID) *ContactFlowChangedEvent { - return &ContactFlowChangedEvent{ - BaseEvent: events.NewBaseEvent(TypeContactFlowChanged), - FlowID: flowID, +// NewSprintEndedEvent creates a new sprint ended event +func NewSprintEndedEvent(c *Contact, resumed bool) *SprintEndedEvent { + return &SprintEndedEvent{ + BaseEvent: events.NewBaseEvent(TypeSprintEnded), + Contact: c, + Resumed: resumed, } } diff --git a/core/models/sessions.go b/core/models/sessions.go index 6a7dce403..d2975f1e6 100644 --- a/core/models/sessions.go +++ b/core/models/sessions.go @@ -446,10 +446,7 @@ func (s *Session) Update(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, eventsToHandle = append(eventsToHandle, sprint.Events()...) } - // if contact's current flow has changed, add pseudo event to handle that - if s.SessionType().Interrupts() && contact.CurrentFlowID() != s.CurrentFlowID() { - eventsToHandle = append(eventsToHandle, NewContactFlowChangedEvent(s.CurrentFlowID())) - } + eventsToHandle = append(eventsToHandle, NewSprintEndedEvent(contact, true)) // apply all our events to generate hooks err = HandleEvents(ctx, rt, tx, oa, s.scene, eventsToHandle) @@ -708,10 +705,7 @@ func InsertSessions(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, oa *O eventsToHandle = append(eventsToHandle, sprints[i].Events()...) } - // if contact's current flow has changed, add pseudo event to handle that - if s.SessionType().Interrupts() && contacts[i].CurrentFlowID() != s.CurrentFlowID() { - eventsToHandle = append(eventsToHandle, NewContactFlowChangedEvent(s.CurrentFlowID())) - } + eventsToHandle = append(eventsToHandle, NewSprintEndedEvent(contacts[i], false)) err = HandleEvents(ctx, rt, tx, oa, s.Scene(), eventsToHandle) if err != nil {