From c5dbc6b9174912fea67df9422aad009901846a04 Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Fri, 26 Mar 2021 21:55:13 -0700 Subject: [PATCH 1/3] First cut at allowing modification of tablet unhealthy_threshold via debugEnv. Signed-off-by: Jacques Grove --- go.mod | 1 + go/vt/vttablet/tabletserver/debugenv.go | 21 ++++++++++++++++++ .../vttablet/tabletserver/health_streamer.go | 22 ++++++++++++++++--- go/vt/vttablet/tabletserver/state_manager.go | 12 +++++++--- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 6840b146fca..879c8f54d41 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,7 @@ require ( github.com/uber/jaeger-client-go v2.16.0+incompatible github.com/uber/jaeger-lib v2.0.0+incompatible // indirect github.com/z-division/go-zookeeper v0.0.0-20190128072838-6d7457066b9b + go.uber.org/atomic v1.4.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/lint v0.0.0-20190930215403-16217165b5de golang.org/x/net v0.0.0-20201021035429-f5854403a974 diff --git a/go/vt/vttablet/tabletserver/debugenv.go b/go/vt/vttablet/tabletserver/debugenv.go index 50a200ec3ad..87d2a81a424 100644 --- a/go/vt/vttablet/tabletserver/debugenv.go +++ b/go/vt/vttablet/tabletserver/debugenv.go @@ -23,6 +23,7 @@ import ( "net/http" "strconv" "text/template" + "time" "vitess.io/vitess/go/acl" "vitess.io/vitess/go/vt/log" @@ -72,6 +73,15 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) f(ival) msg = fmt.Sprintf("Setting %v to: %v", varname, value) } + setDurationVal := func(f func(time.Duration)) { + durationVal, err := time.ParseDuration(value) + if err != nil { + msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) + return + } + f(durationVal) + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + } switch varname { case "PoolSize": setIntVal(tsv.SetPoolSize) @@ -85,6 +95,10 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) setIntVal(tsv.SetMaxResultSize) case "WarnResultSize": setIntVal(tsv.SetWarnResultSize) + case "UnhealthyThreshold": + setDurationVal(tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Set) + setDurationVal(tsv.hs.SetUnhealthyThreshold) + setDurationVal(tsv.sm.SetUnhealthyThreshold) case "Consolidator": tsv.SetConsolidatorMode(value) msg = fmt.Sprintf("Setting %v to: %v", varname, value) @@ -98,12 +112,19 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) Value: fmt.Sprintf("%v", f()), }) } + addDurationVar := func(varname string, f func() time.Duration) { + vars = append(vars, envValue{ + VarName: varname, + Value: fmt.Sprintf("%v", f()), + }) + } addIntVar("PoolSize", tsv.PoolSize) addIntVar("StreamPoolSize", tsv.StreamPoolSize) addIntVar("TxPoolSize", tsv.TxPoolSize) addIntVar("QueryCacheCapacity", tsv.QueryPlanCacheCap) addIntVar("MaxResultSize", tsv.MaxResultSize) addIntVar("WarnResultSize", tsv.WarnResultSize) + addDurationVar("UnhealthyThreshold", tsv.Config().Healthcheck.UnhealthyThresholdSeconds.Get) vars = append(vars, envValue{ VarName: "Consolidator", Value: tsv.ConsolidatorMode(), diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index b42fcb4f1d6..0c06a0d4d16 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -27,6 +27,8 @@ import ( "github.com/golang/protobuf/proto" + "go.uber.org/atomic" + "vitess.io/vitess/go/history" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -51,7 +53,7 @@ var ( type healthStreamer struct { stats *tabletenv.Stats degradedThreshold time.Duration - unhealthyThreshold time.Duration + unhealthyThreshold *atomic.Duration mu sync.Mutex ctx context.Context @@ -66,7 +68,7 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS return &healthStreamer{ stats: env.Stats(), degradedThreshold: env.Config().Healthcheck.DegradedThresholdSeconds.Get(), - unhealthyThreshold: env.Config().Healthcheck.UnhealthyThresholdSeconds.Get(), + unhealthyThreshold: atomic.NewDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()), clients: make(map[chan *querypb.StreamHealthResponse]struct{}), state: &querypb.StreamHealthResponse{ @@ -220,7 +222,7 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { sbm := time.Duration(hs.state.RealtimeStats.SecondsBehindMaster) * time.Second class := healthyClass switch { - case sbm > hs.unhealthyThreshold: + case sbm > hs.unhealthyThreshold.Load(): class = unhealthyClass case sbm > hs.degradedThreshold: class = unhappyClass @@ -240,3 +242,17 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { return details } + +func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) { + hs.unhealthyThreshold.Store(v) + shr := proto.Clone(hs.state).(*querypb.StreamHealthResponse) + for ch := range hs.clients { + select { + case ch <- shr: + default: + log.Info("Resetting health streamer clients due to unhealthy threshold change") + close(ch) + delete(hs.clients, ch) + } + } +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index c1498e43f44..8f1810f59b7 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "go.uber.org/atomic" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" @@ -122,7 +124,7 @@ type stateManager struct { checkMySQLThrottler *sync2.Semaphore timebombDuration time.Duration - unhealthyThreshold time.Duration + unhealthyThreshold *atomic.Duration shutdownGracePeriod time.Duration transitionGracePeriod time.Duration } @@ -187,7 +189,7 @@ func (sm *stateManager) Init(env tabletenv.Env, target querypb.Target) { sm.checkMySQLThrottler = sync2.NewSemaphore(1, 0) sm.timebombDuration = env.Config().OltpReadPool.TimeoutSeconds.Get() * 10 sm.hcticks = timer.NewTimer(env.Config().Healthcheck.IntervalSeconds.Get()) - sm.unhealthyThreshold = env.Config().Healthcheck.UnhealthyThresholdSeconds.Get() + sm.unhealthyThreshold = atomic.NewDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()) sm.shutdownGracePeriod = env.Config().GracePeriods.ShutdownSeconds.Get() sm.transitionGracePeriod = env.Config().GracePeriods.TransitionSeconds.Get() } @@ -640,7 +642,7 @@ func (sm *stateManager) refreshReplHealthLocked() (time.Duration, error) { } sm.replHealthy = false } else { - if lag > sm.unhealthyThreshold { + if lag > sm.unhealthyThreshold.Load() { if sm.replHealthy { log.Infof("Going unhealthy due to high replication lag: %v", lag) } @@ -768,3 +770,7 @@ func (sm *stateManager) IsServingString() string { } return "NOT_SERVING" } + +func (sm *stateManager) SetUnhealthyThreshold(v time.Duration) { + sm.unhealthyThreshold.Store(v) +} From 55a1826d071b01ba568a52aee094f050cac6fcf0 Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Tue, 6 Apr 2021 15:10:45 -0700 Subject: [PATCH 2/3] Swap out uber/atomic with sync2 Signed-off-by: Jacques Grove --- go/vt/vttablet/tabletserver/health_streamer.go | 11 +++++------ go/vt/vttablet/tabletserver/state_manager.go | 10 ++++------ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index 0c06a0d4d16..36e856d3e8e 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -27,9 +27,8 @@ import ( "github.com/golang/protobuf/proto" - "go.uber.org/atomic" - "vitess.io/vitess/go/history" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -53,7 +52,7 @@ var ( type healthStreamer struct { stats *tabletenv.Stats degradedThreshold time.Duration - unhealthyThreshold *atomic.Duration + unhealthyThreshold sync2.AtomicDuration mu sync.Mutex ctx context.Context @@ -68,7 +67,7 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS return &healthStreamer{ stats: env.Stats(), degradedThreshold: env.Config().Healthcheck.DegradedThresholdSeconds.Get(), - unhealthyThreshold: atomic.NewDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()), + unhealthyThreshold: sync2.NewAtomicDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()), clients: make(map[chan *querypb.StreamHealthResponse]struct{}), state: &querypb.StreamHealthResponse{ @@ -222,7 +221,7 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { sbm := time.Duration(hs.state.RealtimeStats.SecondsBehindMaster) * time.Second class := healthyClass switch { - case sbm > hs.unhealthyThreshold.Load(): + case sbm > hs.unhealthyThreshold.Get(): class = unhealthyClass case sbm > hs.degradedThreshold: class = unhappyClass @@ -244,7 +243,7 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv { } func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) { - hs.unhealthyThreshold.Store(v) + hs.unhealthyThreshold.Set(v) shr := proto.Clone(hs.state).(*querypb.StreamHealthResponse) for ch := range hs.clients { select { diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 8f1810f59b7..a18a5ec091d 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -22,8 +22,6 @@ import ( "sync" "time" - "go.uber.org/atomic" - "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/log" @@ -124,7 +122,7 @@ type stateManager struct { checkMySQLThrottler *sync2.Semaphore timebombDuration time.Duration - unhealthyThreshold *atomic.Duration + unhealthyThreshold sync2.AtomicDuration shutdownGracePeriod time.Duration transitionGracePeriod time.Duration } @@ -189,7 +187,7 @@ func (sm *stateManager) Init(env tabletenv.Env, target querypb.Target) { sm.checkMySQLThrottler = sync2.NewSemaphore(1, 0) sm.timebombDuration = env.Config().OltpReadPool.TimeoutSeconds.Get() * 10 sm.hcticks = timer.NewTimer(env.Config().Healthcheck.IntervalSeconds.Get()) - sm.unhealthyThreshold = atomic.NewDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()) + sm.unhealthyThreshold = sync2.NewAtomicDuration(env.Config().Healthcheck.UnhealthyThresholdSeconds.Get()) sm.shutdownGracePeriod = env.Config().GracePeriods.ShutdownSeconds.Get() sm.transitionGracePeriod = env.Config().GracePeriods.TransitionSeconds.Get() } @@ -642,7 +640,7 @@ func (sm *stateManager) refreshReplHealthLocked() (time.Duration, error) { } sm.replHealthy = false } else { - if lag > sm.unhealthyThreshold.Load() { + if lag > sm.unhealthyThreshold.Get() { if sm.replHealthy { log.Infof("Going unhealthy due to high replication lag: %v", lag) } @@ -772,5 +770,5 @@ func (sm *stateManager) IsServingString() string { } func (sm *stateManager) SetUnhealthyThreshold(v time.Duration) { - sm.unhealthyThreshold.Store(v) + sm.unhealthyThreshold.Set(v) } From 816bc45df82ca6a1006930b2f5823f217fa5e0ac Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Tue, 6 Apr 2021 15:13:31 -0700 Subject: [PATCH 3/3] Actually remove the uber/atomic dep. Signed-off-by: Jacques Grove --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index d4da44ac2fd..6fb29510b29 100644 --- a/go.mod +++ b/go.mod @@ -93,7 +93,6 @@ require ( github.com/uber/jaeger-client-go v2.16.0+incompatible github.com/uber/jaeger-lib v2.0.0+incompatible // indirect github.com/z-division/go-zookeeper v0.0.0-20190128072838-6d7457066b9b - go.uber.org/atomic v1.4.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/lint v0.0.0-20190930215403-16217165b5de golang.org/x/net v0.0.0-20201021035429-f5854403a974