From 30f9a91d87b4b50cf6fc950cc167d97489209192 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 1 Dec 2021 13:41:49 -0500 Subject: [PATCH] Remove loop detection now that mailroom does this --- backend.go | 4 --- backends/rapidpro/backend.go | 60 ------------------------------- backends/rapidpro/backend_test.go | 30 ---------------- sender.go | 13 ------- test.go | 5 --- 5 files changed, 112 deletions(-) diff --git a/backend.go b/backend.go index 196d02d4f..9516a0cdc 100644 --- a/backend.go +++ b/backend.go @@ -74,10 +74,6 @@ type Backend interface { // a message is being forced in being resent by a user ClearMsgSent(context.Context, MsgID) error - // IsMsgLoop returns whether the passed in message is part of a message loop, possibly with another bot. Backends should - // implement their own logic to implement this. - IsMsgLoop(ctx context.Context, msg Msg) (bool, error) - // MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case // of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be // used to determine any sort of deduping of msg sends diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 0d362e471..364d0d3fe 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -35,9 +35,6 @@ const sentSetName = "msgs_sent_%s" // our timeout for backend operations const backendTimeout = time.Second * 20 -// number of messages for loop detection -const msgLoopThreshold = 20 - func init() { courier.RegisterBackend("rapidpro", newBackend) } @@ -202,63 +199,6 @@ func (b *backend) ClearMsgSent(ctx context.Context, id courier.MsgID) error { return err } -var luaMsgLoop = redis.NewScript(3, `-- KEYS: [key, contact_id, text] - local key = KEYS[1] - local contact_id = KEYS[2] - local text = KEYS[3] - local count = 1 - - -- try to look up in window - local record = redis.call("hget", key, contact_id) - if record then - local record_count = tonumber(string.sub(record, 1, 2)) - local record_text = string.sub(record, 4, -1) - - if record_text == text then - count = math.min(record_count + 1, 99) - else - count = 1 - end - end - - -- create our new record with our updated count - record = string.format("%02d:%s", count, text) - - -- write our new record with updated count - redis.call("hset", key, contact_id, record) - - -- sets its expiration - redis.call("expire", key, 300) - - return count -`) - -// IsMsgLoop checks whether the passed in message is part of a loop -func (b *backend) IsMsgLoop(ctx context.Context, msg courier.Msg) (bool, error) { - m := msg.(*DBMsg) - - // things that aren't replies can't be loops, neither do we count retries - if m.ResponseToExternalID_ == "" || m.ErrorCount_ > 0 { - return false, nil - } - - // otherwise run our script to check whether this is a loop in the past 5 minutes - rc := b.redisPool.Get() - defer rc.Close() - - keyTime := time.Now().UTC().Round(time.Minute * 5) - key := fmt.Sprintf(sentSetName, fmt.Sprintf("loop_msgs:%s", keyTime.Format("2006-01-02-15:04"))) - count, err := redis.Int(luaMsgLoop.Do(rc, key, m.ContactID_, m.Text_)) - if err != nil { - return false, errors.Wrapf(err, "error while checking for msg loop") - } - - if count >= msgLoopThreshold { - return true, nil - } - return false, nil -} - // MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.Msg, status courier.MsgStatus) { rc := b.redisPool.Get() diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index b803f8e7b..f58bb3186 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -748,36 +748,6 @@ func (ts *BackendTestSuite) TestExternalIDDupes() { ts.True(m2.alreadyWritten) } -func (ts *BackendTestSuite) TestLoop() { - ctx := context.Background() - dbMsg := readMsgFromDB(ts.b, courier.NewMsgID(10000)) - - dbMsg.ResponseToExternalID_ = "65474" - - loop, err := ts.b.IsMsgLoop(ctx, dbMsg) - ts.NoError(err) - ts.False(loop) - - // call it 18 times more, no loop still - for i := 0; i < 18; i++ { - loop, err = ts.b.IsMsgLoop(ctx, dbMsg) - ts.NoError(err) - ts.False(loop) - } - - // last one should make us a loop - loop, err = ts.b.IsMsgLoop(ctx, dbMsg) - ts.NoError(err) - ts.True(loop) - - // make sure this keeps working even in hundreds of loops - for i := 0; i < 100; i++ { - loop, err = ts.b.IsMsgLoop(ctx, dbMsg) - ts.NoError(err) - ts.True(loop) - } -} - func (ts *BackendTestSuite) TestStatus() { // our health should just contain the header ts.True(strings.Contains(ts.b.Status(), "Channel"), ts.b.Status()) diff --git a/sender.go b/sender.go index 63d966822..dbed21aab 100644 --- a/sender.go +++ b/sender.go @@ -189,23 +189,10 @@ func (w *Sender) sendMessage(msg Msg) { log.WithError(err).Error("error looking up msg was sent") } - // is this msg in a loop? - loop, err := backend.IsMsgLoop(sendCTX, msg) - - // failing on loop lookup isn't permanent, but log - if err != nil { - log.WithError(err).Error("error looking up msg loop") - } - if sent { // if this message was already sent, create a wired status for it status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgWired) log.Warning("duplicate send, marking as wired") - } else if loop { - // if this contact is in a loop, fail the message immediately without sending - status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgFailed) - status.AddLog(NewChannelLogFromError("Message Loop", msg.Channel(), msg.ID(), 0, fmt.Errorf("message loop detected, failing message without send"))) - log.Error("message loop detected, failing message") } else { // send our message status, err = server.SendMsg(sendCTX, msg) diff --git a/test.go b/test.go index 09762d438..012c67a1b 100644 --- a/test.go +++ b/test.go @@ -161,11 +161,6 @@ func (mb *MockBackend) ClearMsgSent(ctx context.Context, id MsgID) error { return nil } -// IsMsgLoop returns whether the passed in msg is a loop -func (mb *MockBackend) IsMsgLoop(ctx context.Context, msg Msg) (bool, error) { - return false, nil -} - // MarkOutgoingMsgComplete marks the passed msg as having been dealt with func (mb *MockBackend) MarkOutgoingMsgComplete(ctx context.Context, msg Msg, s MsgStatus) { mb.mutex.Lock()