diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 93f268715b2..5def9c262c3 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -17,22 +17,19 @@ limitations under the License. package txthrottler import ( + "context" "fmt" "math/rand" "sync" "time" - "vitess.io/vitess/go/vt/sqlparser" - - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/encoding/prototext" - - "context" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -42,6 +39,59 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +// These vars store the functions used to create the topo server, healthcheck, +// topology watchers and go/vt/throttler. These are provided here so that they can be overridden +// in tests to generate mocks. +type healthCheckFactoryFunc func() discovery.LegacyHealthCheck +type topologyWatcherFactoryFunc func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface +type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) + +var ( + healthCheckFactory healthCheckFactoryFunc + topologyWatcherFactory topologyWatcherFactoryFunc + throttlerFactory throttlerFactoryFunc +) + +func init() { + resetTxThrottlerFactories() +} + +func resetTxThrottlerFactories() { + healthCheckFactory = discovery.NewLegacyDefaultHealthCheck + topologyWatcherFactory = func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { + return discovery.NewLegacyShardReplicationWatcher(context.Background(), topoServer, tr, cell, keyspace, shard, refreshInterval, topoReadConcurrency) + } + throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) { + return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag) + } +} + +// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler +// It is only used here to allow mocking out a throttler object. +type ThrottlerInterface interface { + Throttle(threadID int) time.Duration + ThreadFinished(threadID int) + Close() + MaxRate() int64 + SetMaxRate(rate int64) + RecordReplicationLag(time time.Time, ts *discovery.LegacyTabletStats) + GetConfiguration() *throttlerdatapb.Configuration + UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error + ResetConfiguration() +} + +// TopologyWatcherInterface defines the public interface that is implemented by +// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out +// go/vt/discovery.LegacyTopologyWatcher. +type TopologyWatcherInterface interface { + WaitForInitialTopology() error + Stop() +} + +// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with +// go/vt/throttler.GlobalManager. +const TxThrottlerName = "TransactionThrottler" + // TxThrottler throttles transactions based on replication lag. // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.LegacyHealthCheck to send replication-lag updates to the wrapped throttler. @@ -87,10 +137,6 @@ type TxThrottler struct { requestsThrottled *stats.Counter } -// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with -// go/vt/throttler.GlobalManager. -const TxThrottlerName = "TransactionThrottler" - // NewTxThrottler tries to construct a TxThrottler from the // relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if // any error occurs. @@ -101,10 +147,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler { if err != nil { log.Errorf("Error creating transaction throttler. Transaction throttling will"+ " be disabled. Error: %v", err) - txThrottler, err = newTxThrottler(env, &txThrottlerConfig{enabled: false}) - if err != nil { - panic("BUG: Can't create a disabled transaction throttler") - } + // newTxThrottler with disabled config never returns an error + txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false}) } else { log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config) } @@ -154,28 +198,6 @@ type txThrottlerConfig struct { healthCheckCells []string } -// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler -// It is only used here to allow mocking out a throttler object. -type ThrottlerInterface interface { - Throttle(threadID int) time.Duration - ThreadFinished(threadID int) - Close() - MaxRate() int64 - SetMaxRate(rate int64) - RecordReplicationLag(time time.Time, ts *discovery.LegacyTabletStats) - GetConfiguration() *throttlerdatapb.Configuration - UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error - ResetConfiguration() -} - -// TopologyWatcherInterface defines the public interface that is implemented by -// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out -// go/vt/discovery.LegacyTopologyWatcher. -type TopologyWatcherInterface interface { - WaitForInitialTopology() error - Stop() -} - // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -187,33 +209,6 @@ type txThrottlerState struct { topologyWatchers []TopologyWatcherInterface } -// These vars store the functions used to create the topo server, healthcheck, -// topology watchers and go/vt/throttler. These are provided here so that they can be overridden -// in tests to generate mocks. -type healthCheckFactoryFunc func() discovery.LegacyHealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface -type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) - -var ( - healthCheckFactory healthCheckFactoryFunc - topologyWatcherFactory topologyWatcherFactoryFunc - throttlerFactory throttlerFactoryFunc -) - -func init() { - resetTxThrottlerFactories() -} - -func resetTxThrottlerFactories() { - healthCheckFactory = discovery.NewLegacyDefaultHealthCheck - topologyWatcherFactory = func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewLegacyShardReplicationWatcher(context.Background(), topoServer, tr, cell, keyspace, shard, refreshInterval, topoReadConcurrency) - } - throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) { - return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag) - } -} - func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) { if config.enabled { // Verify config. @@ -234,7 +229,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, } // Open opens the transaction throttler. It must be called prior to 'Throttle'. -func (t *TxThrottler) Open() error { +func (t *TxThrottler) Open() (err error) { if !t.config.enabled { return nil } @@ -243,8 +238,7 @@ func (t *TxThrottler) Open() error { } log.Info("TxThrottler: opening") t.throttlerRunning.Set(1) - var err error - t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard) + t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell) return err } @@ -272,9 +266,6 @@ func (t *TxThrottler) Throttle(priority int) (result bool) { if !t.config.enabled { return false } - if t.state == nil { - panic("BUG: Throttle() called on a closed TxThrottler") - } // Throttle according to both what the throttle state says, and the priority. Workloads with higher priority // are less likely to be throttled. @@ -287,7 +278,7 @@ func (t *TxThrottler) Throttle(priority int) (result bool) { return result } -func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard string, +func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string, ) (*txThrottlerState, error) { t, err := throttlerFactory( TxThrottlerName, @@ -326,7 +317,8 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard string, func (ts *txThrottlerState) throttle() bool { if ts.throttler == nil { - panic("BUG: throttle called after deallocateResources was called.") + log.Error("throttle called after deallocateResources was called") + return false } // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index e8ebc4a0425..c2dbfe9fe1d 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -29,11 +29,13 @@ import ( "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" + throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -145,3 +147,34 @@ func TestEnabledThrottler(t *testing.T) { throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) } + +func TestNewTxThrottler(t *testing.T) { + config := tabletenv.NewDefaultConfig() + env := tabletenv.NewEnv(config, t.Name()) + + { + // disabled config + throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false}) + assert.Nil(t, err) + assert.NotNil(t, throttler) + } + { + // enabled with invalid throttler config + throttler, err := newTxThrottler(env, &txThrottlerConfig{ + enabled: true, + throttlerConfig: &throttlerdatapb.Configuration{}, + }) + assert.NotNil(t, err) + assert.Nil(t, throttler) + } + { + // enabled + throttler, err := newTxThrottler(env, &txThrottlerConfig{ + enabled: true, + healthCheckCells: []string{"cell1"}, + throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration, + }) + assert.Nil(t, err) + assert.NotNil(t, throttler) + } +}