Skip to content

Commit

Permalink
Merge pull request nyaruka#390 from nyaruka/remove_loop_detection
Browse files Browse the repository at this point in the history
Remove loop detection
  • Loading branch information
rowanseymour authored Dec 2, 2021
2 parents bef57dd + 30f9a91 commit b6b72d9
Show file tree
Hide file tree
Showing 5 changed files with 0 additions and 112 deletions.
4 changes: 0 additions & 4 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// IsMsgLoop returns whether the passed in message is part of a message loop, possibly with another bot. Backends should
// implement their own logic to implement this.
IsMsgLoop(ctx context.Context, msg Msg) (bool, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// used to determine any sort of deduping of msg sends
Expand Down
60 changes: 0 additions & 60 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ const sentSetName = "msgs_sent_%s"
// our timeout for backend operations
const backendTimeout = time.Second * 20

// number of messages for loop detection
const msgLoopThreshold = 20

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}
Expand Down Expand Up @@ -202,63 +199,6 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return err
}

var luaMsgLoop = redis.NewScript(3, `-- KEYS: [key, contact_id, text]
local key = KEYS[1]
local contact_id = KEYS[2]
local text = KEYS[3]
local count = 1
-- try to look up in window
local record = redis.call("hget", key, contact_id)
if record then
local record_count = tonumber(string.sub(record, 1, 2))
local record_text = string.sub(record, 4, -1)
if record_text == text then
count = math.min(record_count + 1, 99)
else
count = 1
end
end
-- create our new record with our updated count
record = string.format("%02d:%s", count, text)
-- write our new record with updated count
redis.call("hset", key, contact_id, record)
-- sets its expiration
redis.call("expire", key, 300)
return count
`)

// IsMsgLoop checks whether the passed in message is part of a loop
func (b *backend) IsMsgLoop(ctx context.Context, msg courier.Msg) (bool, error) {
m := msg.(*DBMsg)

// things that aren't replies can't be loops, neither do we count retries
if m.ResponseToExternalID_ == "" || m.ErrorCount_ > 0 {
return false, nil
}

// otherwise run our script to check whether this is a loop in the past 5 minutes
rc := b.redisPool.Get()
defer rc.Close()

keyTime := time.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf(sentSetName, fmt.Sprintf("loop_msgs:%s", keyTime.Format("2006-01-02-15:04")))
count, err := redis.Int(luaMsgLoop.Do(rc, key, m.ContactID_, m.Text_))
if err != nil {
return false, errors.Wrapf(err, "error while checking for msg loop")
}

if count >= msgLoopThreshold {
return true, nil
}
return false, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg, status courier.MsgStatus) {
rc := b.redisPool.Get()
Expand Down
30 changes: 0 additions & 30 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,36 +748,6 @@ func (ts *BackendTestSuite) TestExternalIDDupes() {
ts.True(m2.alreadyWritten)
}

func (ts *BackendTestSuite) TestLoop() {
ctx := context.Background()
dbMsg := readMsgFromDB(ts.b, courier.NewMsgID(10000))

dbMsg.ResponseToExternalID_ = "65474"

loop, err := ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.False(loop)

// call it 18 times more, no loop still
for i := 0; i < 18; i++ {
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.False(loop)
}

// last one should make us a loop
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.True(loop)

// make sure this keeps working even in hundreds of loops
for i := 0; i < 100; i++ {
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.True(loop)
}
}

func (ts *BackendTestSuite) TestStatus() {
// our health should just contain the header
ts.True(strings.Contains(ts.b.Status(), "Channel"), ts.b.Status())
Expand Down
13 changes: 0 additions & 13 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,10 @@ func (w *Sender) sendMessage(msg Msg) {
log.WithError(err).Error("error looking up msg was sent")
}

// is this msg in a loop?
loop, err := backend.IsMsgLoop(sendCTX, msg)

// failing on loop lookup isn't permanent, but log
if err != nil {
log.WithError(err).Error("error looking up msg loop")
}

if sent {
// if this message was already sent, create a wired status for it
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgWired)
log.Warning("duplicate send, marking as wired")
} else if loop {
// if this contact is in a loop, fail the message immediately without sending
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgFailed)
status.AddLog(NewChannelLogFromError("Message Loop", msg.Channel(), msg.ID(), 0, fmt.Errorf("message loop detected, failing message without send")))
log.Error("message loop detected, failing message")
} else {
// send our message
status, err = server.SendMsg(sendCTX, msg)
Expand Down
5 changes: 0 additions & 5 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ func (mb *MockBackend) ClearMsgSent(ctx context.Context, id MsgID) error {
return nil
}

// IsMsgLoop returns whether the passed in msg is a loop
func (mb *MockBackend) IsMsgLoop(ctx context.Context, msg Msg) (bool, error) {
return false, nil
}

// MarkOutgoingMsgComplete marks the passed msg as having been dealt with
func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg Msg, s MsgStatus) {
mb.mutex.Lock()
Expand Down

0 comments on commit b6b72d9

Please sign in to comment.