From 8b00258c0191ef16026a7c90b0309216e3f98243 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Sat, 18 Apr 2020 20:17:08 -0600 Subject: [PATCH 1/5] messager: add 33% jitter to postpone backoff Signed-off-by: Derek Perkins --- .../tabletserver/messager/message_manager.go | 57 ++++++++++++++++--- .../messager/message_manager_test.go | 4 +- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 5719c2fd4c2..ffafd768ea9 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -17,6 +17,7 @@ limitations under the License. package messager import ( + "bytes" "fmt" "io" "sync" @@ -255,17 +256,55 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos mm.purgeQuery = sqlparser.BuildParsedQuery( "delete from %v where time_acked < %a limit 500", mm.name, ":time_acked") - // if a maxBackoff is set, incorporate it into the update statement - if mm.maxBackoff > 0 { - mm.postponeQuery = sqlparser.BuildParsedQuery( - "update %v set time_next = %a+if(%a< %a, %a, %a< %%a, %%a + %%a, %s)", baseTimeNext, baseTimeNext)) + // baseTimeNext - :time_now > :max_backoff + args = append(args, ":time_now", ":max_backoff", ":time_now", ":time_now", ":max_backoff") + // if it is greater, then use :time_now + :max_backoff + args = append(args, ":time_now", ":max_backoff") + // otherwise just use the raw jittered time_next + args = append(args, ":time_now", ":min_backoff", ":time_now") } - return mm + + // close the if statement + buf.WriteString(")") + + // now that we've identified time_next, finish the statement + buf.WriteString(", epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null") + args = append(args, "::ids") + + return sqlparser.BuildParsedQuery(buf.String(), args...) } // buildSelectColumnList is a convenience function that diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 0f6bbd44def..2a914eed829 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = :time_now+(:min_backoff< Date: Sun, 19 Apr 2020 16:01:38 -0600 Subject: [PATCH 2/5] fix parentheses Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/messager/message_manager.go | 2 +- go/vt/vttablet/tabletserver/messager/message_manager_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index ffafd768ea9..19033a3ebbb 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -269,7 +269,7 @@ func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.D // have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages // whenever this is injected, append (:time_now, :min_backoff, :time_now) - baseTimeNext := "%a+FLOOR((%a< Date: Mon, 20 Apr 2020 07:47:14 -0600 Subject: [PATCH 3/5] simplify logic Signed-off-by: Derek Perkins --- .../tabletserver/messager/message_manager.go | 25 +++++++++---------- .../messager/message_manager_test.go | 4 +-- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 19033a3ebbb..b5f7e13988d 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -268,33 +268,32 @@ func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.D args = append(args, name) // have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages - // whenever this is injected, append (:time_now, :min_backoff, :time_now) - baseTimeNext := "(%a+FLOOR((%a< %%a, %%a + %%a, %s)", baseTimeNext, baseTimeNext)) - // baseTimeNext - :time_now > :max_backoff - args = append(args, ":time_now", ":max_backoff", ":time_now", ":time_now", ":max_backoff") + buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a + %%a, %s)", jitteredBackoff, jitteredBackoff)) + // jitteredBackoff > :max_backoff + args = append(args, ":min_backoff", ":time_now", ":max_backoff") // if it is greater, then use :time_now + :max_backoff args = append(args, ":time_now", ":max_backoff") // otherwise just use the raw jittered time_next - args = append(args, ":time_now", ":min_backoff", ":time_now") + args = append(args, ":min_backoff", ":time_now") } // close the if statement diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 6469a877ce3..2a285cf9cbb 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = IF((:time_now+FLOOR((:min_backoff< Date: Mon, 20 Apr 2020 08:09:03 -0600 Subject: [PATCH 4/5] fix calculation Signed-off-by: Derek Perkins --- .../tabletserver/messager/message_manager.go | 12 ++++++------ .../tabletserver/messager/message_manager_test.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index b5f7e13988d..6a6cf600477 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -281,19 +281,19 @@ func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.D args = append(args, ":time_now", ":min_backoff") // now we are setting the false case on the above IF statement - // if there is no max_backoff, just use the raw jittered backoff if maxBackoff == 0 { - buf.WriteString(jitteredBackoff) - args = append(args, ":min_backoff", ":time_now") + // if there is no max_backoff, just use :time_now + jitteredBackoff + buf.WriteString(fmt.Sprintf("%%a + %s",jitteredBackoff)) + args = append(args, ":time_now", ":min_backoff", ":time_now") } else { // make sure that it doesn't exceed max_backoff - buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a + %%a, %s)", jitteredBackoff, jitteredBackoff)) + buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a + %%a, %%a + %s)", jitteredBackoff, jitteredBackoff)) // jitteredBackoff > :max_backoff args = append(args, ":min_backoff", ":time_now", ":max_backoff") // if it is greater, then use :time_now + :max_backoff args = append(args, ":time_now", ":max_backoff") - // otherwise just use the raw jittered time_next - args = append(args, ":min_backoff", ":time_now") + // otherwise just use :time_now + jitteredBackoff + args = append(args, ":time_now", ":min_backoff", ":time_now") } // close the if statement diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 2a285cf9cbb..7156c26cdb2 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = IF(FLOOR((:min_backoff< Date: Mon, 20 Apr 2020 08:39:27 -0600 Subject: [PATCH 5/5] go fmt Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/messager/message_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 6a6cf600477..5a5d889da90 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -282,8 +282,8 @@ func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.D // now we are setting the false case on the above IF statement if maxBackoff == 0 { - // if there is no max_backoff, just use :time_now + jitteredBackoff - buf.WriteString(fmt.Sprintf("%%a + %s",jitteredBackoff)) + // if there is no max_backoff, just use :time_now + jitteredBackoff + buf.WriteString(fmt.Sprintf("%%a + %s", jitteredBackoff)) args = append(args, ":time_now", ":min_backoff", ":time_now") } else { // make sure that it doesn't exceed max_backoff