diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 5a5d889da90..88c949672e5 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "math/rand" "sync" "time" @@ -264,36 +265,37 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.Duration) *sqlparser.ParsedQuery { var args []interface{} - buf := bytes.NewBufferString("update %v set time_next = ") - args = append(args, name) + // since messages are immediately postponed upon sending, we need to add exponential backoff on top + // of the ackWaitTime, otherwise messages will be resent too quickly. + buf := bytes.NewBufferString("update %v set time_next = %a + %a + ") + args = append(args, name, ":time_now", ":wait_time") - // have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages - // whenever this is injected, append (:min_backoff, :time_now) - jitteredBackoff := "FLOOR((%a< %%a, %%a + %%a, %%a + %s)", jitteredBackoff, jitteredBackoff)) + buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a, %s)", jitteredBackoff, jitteredBackoff)) // jitteredBackoff > :max_backoff - args = append(args, ":min_backoff", ":time_now", ":max_backoff") - // if it is greater, then use :time_now + :max_backoff - args = append(args, ":time_now", ":max_backoff") - // otherwise just use :time_now + jitteredBackoff - args = append(args, ":time_now", ":min_backoff", ":time_now") + args = append(args, ":min_backoff", ":jitter", ":max_backoff") + // if it is greater, then use :max_backoff + args = append(args, ":max_backoff") + // otherwise just use jitteredBackoff + args = append(args, ":min_backoff", ":jitter") } // close the if statement @@ -847,7 +849,9 @@ func (mm *messageManager) GeneratePostponeQuery(ids []string) (string, map[strin bvs := map[string]*querypb.BindVariable{ "time_now": sqltypes.Int64BindVariable(time.Now().UnixNano()), + "wait_time": sqltypes.Int64BindVariable(int64(mm.ackWaitTime)), "min_backoff": sqltypes.Int64BindVariable(int64(mm.minBackoff)), + "jitter": sqltypes.Float64BindVariable(.666666 + rand.Float64()*.666666), "ids": idbvs, } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 6d9ea391427..b3aec44e4ef 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = IF(FLOOR((:min_backoff<