From e05a0b89ca1200c02f02928f8a7e2969156d4488 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 17 May 2023 09:19:01 +0200 Subject: [PATCH] txthrottler: further code cleanup (#12902) * txthrottler: further code cleanup Signed-off-by: Tim Vaillancourt * Fix bad merge resolution Signed-off-by: Tim Vaillancourt --------- Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 103 +++++++++--------- .../txthrottler/tx_throttler_test.go | 6 +- 2 files changed, 56 insertions(+), 53 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index d0b9013499d..64e7070a228 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -53,10 +53,6 @@ var ( throttlerFactory throttlerFactoryFunc ) -func init() { - resetTxThrottlerFactories() -} - func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) @@ -69,6 +65,10 @@ func resetTxThrottlerFactories() { } } +func init() { + resetTxThrottlerFactories() +} + // ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler // It is only used here to allow mocking out a throttler object. type ThrottlerInterface interface { @@ -132,7 +132,8 @@ type TxThrottler struct { // if the TransactionThrottler is closed. state *txThrottlerState - target *querypb.Target + target *querypb.Target + topoServer *topo.Server // stats throttlerRunning *stats.Gauge @@ -140,6 +141,37 @@ type TxThrottler struct { requestsThrottled *stats.Counter } +// txThrottlerConfig holds the parameters that need to be +// passed when constructing a TxThrottler object. +type txThrottlerConfig struct { + // enabled is true if the transaction throttler is enabled. All methods + // of a disabled transaction throttler do nothing and Throttle() always + // returns false. + enabled bool + + throttlerConfig *throttlerdatapb.Configuration + // healthCheckCells stores the cell names in which running vttablets will be monitored for + // replication lag. + healthCheckCells []string + + // tabletTypes stores the tablet types for throttling + tabletTypes *topoproto.TabletTypeListFlag +} + +// txThrottlerState holds the state of an open TxThrottler object. +type txThrottlerState struct { + config *txThrottlerConfig + + // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). + // That method is required to be called in serial for each threadId. + throttleMu sync.Mutex + throttler ThrottlerInterface + stopHealthCheck context.CancelFunc + + healthCheck discovery.HealthCheck + topologyWatchers []TopologyWatcherInterface +} + // NewTxThrottler tries to construct a TxThrottler from the // relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if // any error occurs. @@ -151,7 +183,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler { log.Errorf("Error creating transaction throttler. Transaction throttling will"+ " be disabled. Error: %v", err) // newTxThrottler with disabled config never returns an error - txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false}) + txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false}) } else { log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config) } @@ -165,7 +197,7 @@ func (t *TxThrottler) InitDBConfig(target *querypb.Target) { func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) { if !env.Config().EnableTxThrottler { - return newTxThrottler(env, &txThrottlerConfig{enabled: false}) + return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false}) } var throttlerConfig throttlerdatapb.Configuration @@ -178,48 +210,15 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrott healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells)) copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells) - return newTxThrottler(env, &txThrottlerConfig{ + return newTxThrottler(env, topoServer, &txThrottlerConfig{ enabled: true, - topoServer: topoServer, tabletTypes: env.Config().TxThrottlerTabletTypes, throttlerConfig: &throttlerConfig, healthCheckCells: healthCheckCells, }) } -// txThrottlerConfig holds the parameters that need to be -// passed when constructing a TxThrottler object. -type txThrottlerConfig struct { - // enabled is true if the transaction throttler is enabled. All methods - // of a disabled transaction throttler do nothing and Throttle() always - // returns false. - enabled bool - - topoServer *topo.Server - throttlerConfig *throttlerdatapb.Configuration - // healthCheckCells stores the cell names in which running vttablets will be monitored for - // replication lag. - healthCheckCells []string - - // tabletTypes stores the tablet types for throttling - tabletTypes *topoproto.TabletTypeListFlag -} - -// txThrottlerState holds the state of an open TxThrottler object. -type txThrottlerState struct { - config *txThrottlerConfig - - // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). - // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc - - healthCheck discovery.HealthCheck - topologyWatchers []TopologyWatcherInterface -} - -func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) { +func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*TxThrottler, error) { if config.enabled { // Verify config. err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify() @@ -232,6 +231,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, } return &TxThrottler{ config: config, + topoServer: topoServer, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), @@ -248,7 +248,7 @@ func (t *TxThrottler) Open() (err error) { } log.Info("TxThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell) + t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target) return err } @@ -276,6 +276,9 @@ func (t *TxThrottler) Throttle(priority int) (result bool) { if !t.config.enabled { return false } + if t.state == nil { + return false + } // Throttle according to both what the throttle state says, and the priority. Workloads with lower priority value // are less likely to be throttled. @@ -288,7 +291,7 @@ func (t *TxThrottler) Throttle(priority int) (result bool) { return result } -func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) { +func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig} t, err := throttlerFactory( @@ -309,7 +312,7 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string config: config, throttler: t, } - createTxThrottlerHealthCheck(config, result, cell) + createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) result.topologyWatchers = make( []TopologyWatcherInterface, 0, len(config.healthCheckCells)) @@ -317,21 +320,21 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string result.topologyWatchers = append( result.topologyWatchers, topologyWatcherFactory( - config.topoServer, + topoServer, result.healthCheck, cell, - keyspace, - shard, + target.Keyspace, + target.Shard, discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency)) } return result, nil } -func createTxThrottlerHealthCheck(config *txThrottlerConfig, result *txThrottlerState, cell string) { +func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(config.topoServer, cell, config.healthCheckCells) + result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells) ch := result.healthCheck.Subscribe() go func(ctx context.Context) { for { diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index bb2934a2bea..523b45c6174 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -154,13 +154,13 @@ func TestNewTxThrottler(t *testing.T) { { // disabled config - throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false}) + throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{enabled: false}) assert.Nil(t, err) assert.NotNil(t, throttler) } { // enabled with invalid throttler config - throttler, err := newTxThrottler(env, &txThrottlerConfig{ + throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{ enabled: true, throttlerConfig: &throttlerdatapb.Configuration{}, }) @@ -169,7 +169,7 @@ func TestNewTxThrottler(t *testing.T) { } { // enabled - throttler, err := newTxThrottler(env, &txThrottlerConfig{ + throttler, err := newTxThrottler(env, nil, &txThrottlerConfig{ enabled: true, healthCheckCells: []string{"cell1"}, throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration,