Skip to content

Commit

Permalink
Allow expirations and timeouts to resume sessions for stopped, blocke…
Browse files Browse the repository at this point in the history
…d and archived contacts
  • Loading branch information
rowanseymour committed Mar 15, 2022
1 parent a143513 commit a1e56d8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
16 changes: 7 additions & 9 deletions core/tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func HandleWaitExpirations(ctx context.Context, rt *runtime.Runtime) error {
}

// if it can't be resumed, add to batch to be expired
if !expiredWait.WaitResumes || expiredWait.ContactStatus != models.ContactStatusActive {
if !expiredWait.WaitResumes {
expiredSessions = append(expiredSessions, expiredWait.SessionID)

// batch is full? commit it
Expand Down Expand Up @@ -135,20 +135,18 @@ func HandleWaitExpirations(ctx context.Context, rt *runtime.Runtime) error {
}

const sqlSelectExpiredWaits = `
SELECT s.id as session_id, s.org_id, s.wait_expires_on, s.wait_resume_on_expire , c.id as contact_id, c.status as contact_status
SELECT s.id as session_id, s.org_id, s.wait_expires_on, s.wait_resume_on_expire , s.contact_id
FROM flows_flowsession s
INNER JOIN contacts_contact c ON c.id = s.contact_id
WHERE s.session_type = 'M' AND s.status = 'W' AND s.wait_expires_on <= NOW()
ORDER BY s.wait_expires_on ASC
LIMIT 25000`

type ExpiredWait struct {
SessionID models.SessionID `db:"session_id"`
OrgID models.OrgID `db:"org_id"`
WaitExpiresOn time.Time `db:"wait_expires_on"`
WaitResumes bool `db:"wait_resume_on_expire"`
ContactID models.ContactID `db:"contact_id"`
ContactStatus models.ContactStatus `db:"contact_status"`
SessionID models.SessionID `db:"session_id"`
OrgID models.OrgID `db:"org_id"`
WaitExpiresOn time.Time `db:"wait_expires_on"`
WaitResumes bool `db:"wait_resume_on_expire"`
ContactID models.ContactID `db:"contact_id"`
}

// ExpireVoiceSessions looks for voice sessions that should be expired and ends them
Expand Down
28 changes: 17 additions & 11 deletions core/tasks/expirations/cron_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package expirations_test

import (
"encoding/json"
"testing"
"time"

"github.com/nyaruka/gocommon/dbutil/assertdb"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/goflow/envs"
_ "github.com/nyaruka/mailroom/core/handlers"
"github.com/nyaruka/mailroom/core/models"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestExpirations(t *testing.T) {

// create a parent/child session for the blocked contact
s5ID := testdata.InsertWaitingSession(db, testdata.Org1, blake, models.FlowTypeMessaging, testdata.Favorites, models.NilConnectionID, time.Now(), time.Now(), true, nil)
r6ID := testdata.InsertFlowRun(db, testdata.Org1, s5ID, blake, testdata.Favorites, models.RunStatusWaiting)
r6ID := testdata.InsertFlowRun(db, testdata.Org1, s5ID, blake, testdata.Favorites, models.RunStatusActive)
r7ID := testdata.InsertFlowRun(db, testdata.Org1, s5ID, blake, testdata.Favorites, models.RunStatusWaiting)

time.Sleep(5 * time.Millisecond)
Expand All @@ -73,23 +73,29 @@ func TestExpirations(t *testing.T) {
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1;`, s4ID).Columns(map[string]interface{}{"status": "W"})
assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r5ID).Columns(map[string]interface{}{"status": "W"})

// blocked contact's session and runs should be expired because a blocked contact can't resume
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1;`, s5ID).Columns(map[string]interface{}{"status": "X"})
assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r6ID).Columns(map[string]interface{}{"status": "X"})
assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r7ID).Columns(map[string]interface{}{"status": "X"})
// blocked contact's session and runs sshould be unchanged because it's been queued for resumption.. like any other contact
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1;`, s5ID).Columns(map[string]interface{}{"status": "W"})
assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r6ID).Columns(map[string]interface{}{"status": "A"})
assertdb.Query(t, db, `SELECT status FROM flows_flowrun WHERE id = $1;`, r7ID).Columns(map[string]interface{}{"status": "W"})

// should have created one task
// should have created two expiration tasks
task, err := queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)
assert.NotNil(t, task)

// decode the task
// check the first task
eventTask := &handler.HandleEventTask{}
err = json.Unmarshal(task.Task, eventTask)
jsonx.MustUnmarshal(task.Task, eventTask)
assert.Equal(t, testdata.George.ID, eventTask.ContactID)

task, err = queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)
assert.NotNil(t, task)

// assert its the right contact
assert.Equal(t, testdata.George.ID, eventTask.ContactID)
// check the second task
eventTask = &handler.HandleEventTask{}
jsonx.MustUnmarshal(task.Task, eventTask)
assert.Equal(t, blake.ID, eventTask.ContactID)

// no other tasks
task, err = queue.PopNextTask(rc, queue.HandlerQueue)
Expand Down
5 changes: 0 additions & 5 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,6 @@ func handleTimedEvent(ctx context.Context, rt *runtime.Runtime, eventType string
return errors.Wrapf(err, "error loading contact")
}

// contact has been deleted or is blocked/stopped/archived, ignore this event
if len(contacts) == 0 || contacts[0].Status() != models.ContactStatusActive {
return nil
}

modelContact := contacts[0]

// build our flow contact
Expand Down

0 comments on commit a1e56d8

Please sign in to comment.