Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove loop detection #390

Merged
merged 1 commit into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// IsMsgLoop returns whether the passed in message is part of a message loop, possibly with another bot. Backends should
// implement their own logic to implement this.
IsMsgLoop(ctx context.Context, msg Msg) (bool, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be
// used to determine any sort of deduping of msg sends
Expand Down
60 changes: 0 additions & 60 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ const sentSetName = "msgs_sent_%s"
// our timeout for backend operations
const backendTimeout = time.Second * 20

// number of messages for loop detection
const msgLoopThreshold = 20

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}
Expand Down Expand Up @@ -202,63 +199,6 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error {
return err
}

var luaMsgLoop = redis.NewScript(3, `-- KEYS: [key, contact_id, text]
local key = KEYS[1]
local contact_id = KEYS[2]
local text = KEYS[3]
local count = 1

-- try to look up in window
local record = redis.call("hget", key, contact_id)
if record then
local record_count = tonumber(string.sub(record, 1, 2))
local record_text = string.sub(record, 4, -1)

if record_text == text then
count = math.min(record_count + 1, 99)
else
count = 1
end
end

-- create our new record with our updated count
record = string.format("%02d:%s", count, text)

-- write our new record with updated count
redis.call("hset", key, contact_id, record)

-- sets its expiration
redis.call("expire", key, 300)

return count
`)

// IsMsgLoop checks whether the passed in message is part of a loop
func (b *backend) IsMsgLoop(ctx context.Context, msg courier.Msg) (bool, error) {
m := msg.(*DBMsg)

// things that aren't replies can't be loops, neither do we count retries
if m.ResponseToExternalID_ == "" || m.ErrorCount_ > 0 {
return false, nil
}

// otherwise run our script to check whether this is a loop in the past 5 minutes
rc := b.redisPool.Get()
defer rc.Close()

keyTime := time.Now().UTC().Round(time.Minute * 5)
key := fmt.Sprintf(sentSetName, fmt.Sprintf("loop_msgs:%s", keyTime.Format("2006-01-02-15:04")))
count, err := redis.Int(luaMsgLoop.Do(rc, key, m.ContactID_, m.Text_))
if err != nil {
return false, errors.Wrapf(err, "error while checking for msg loop")
}

if count >= msgLoopThreshold {
return true, nil
}
return false, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg, status courier.MsgStatus) {
rc := b.redisPool.Get()
Expand Down
30 changes: 0 additions & 30 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,36 +748,6 @@ func (ts *BackendTestSuite) TestExternalIDDupes() {
ts.True(m2.alreadyWritten)
}

func (ts *BackendTestSuite) TestLoop() {
ctx := context.Background()
dbMsg := readMsgFromDB(ts.b, courier.NewMsgID(10000))

dbMsg.ResponseToExternalID_ = "65474"

loop, err := ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.False(loop)

// call it 18 times more, no loop still
for i := 0; i < 18; i++ {
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.False(loop)
}

// last one should make us a loop
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.True(loop)

// make sure this keeps working even in hundreds of loops
for i := 0; i < 100; i++ {
loop, err = ts.b.IsMsgLoop(ctx, dbMsg)
ts.NoError(err)
ts.True(loop)
}
}

func (ts *BackendTestSuite) TestStatus() {
// our health should just contain the header
ts.True(strings.Contains(ts.b.Status(), "Channel"), ts.b.Status())
Expand Down
13 changes: 0 additions & 13 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,10 @@ func (w *Sender) sendMessage(msg Msg) {
log.WithError(err).Error("error looking up msg was sent")
}

// is this msg in a loop?
loop, err := backend.IsMsgLoop(sendCTX, msg)

// failing on loop lookup isn't permanent, but log
if err != nil {
log.WithError(err).Error("error looking up msg loop")
}

if sent {
// if this message was already sent, create a wired status for it
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgWired)
log.Warning("duplicate send, marking as wired")
} else if loop {
// if this contact is in a loop, fail the message immediately without sending
status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgFailed)
status.AddLog(NewChannelLogFromError("Message Loop", msg.Channel(), msg.ID(), 0, fmt.Errorf("message loop detected, failing message without send")))
log.Error("message loop detected, failing message")
} else {
// send our message
status, err = server.SendMsg(sendCTX, msg)
Expand Down
5 changes: 0 additions & 5 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,6 @@ func (mb *MockBackend) ClearMsgSent(ctx context.Context, id MsgID) error {
return nil
}

// IsMsgLoop returns whether the passed in msg is a loop
func (mb *MockBackend) IsMsgLoop(ctx context.Context, msg Msg) (bool, error) {
return false, nil
}

// MarkOutgoingMsgComplete marks the passed msg as having been dealt with
func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg Msg, s MsgStatus) {
mb.mutex.Lock()
Expand Down