Skip to content

Commit

Permalink
Cleanup panics in txthrottler, reorder for readability (vitessio#12901
Browse files Browse the repository at this point in the history
)

* Cleanup tx_throttler.go

Signed-off-by: Tim Vaillancourt <[email protected]>

* Cleanup tx_throttler.go #2

Signed-off-by: Tim Vaillancourt <[email protected]>

* Fix throttlerFactoryFunc

Signed-off-by: Tim Vaillancourt <[email protected]>

* Undo if-cond consolidation

Signed-off-by: Tim Vaillancourt <[email protected]>

* Undo struct shuffling

Signed-off-by: Tim Vaillancourt <[email protected]>

* prove that disabled config returns nil error

Signed-off-by: Tim Vaillancourt <[email protected]>

* Improve test

Signed-off-by: Tim Vaillancourt <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed May 12, 2023
1 parent 3d306c8 commit 255716f
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 71 deletions.
134 changes: 63 additions & 71 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@ limitations under the License.
package txthrottler

import (
"context"
"fmt"
"math/rand"
"sync"
"time"

"vitess.io/vitess/go/vt/sqlparser"

"google.golang.org/protobuf/proto"

"google.golang.org/protobuf/encoding/prototext"

"context"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand All @@ -42,6 +39,59 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func() discovery.LegacyHealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = discovery.NewLegacyDefaultHealthCheck
topologyWatcherFactory = func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewLegacyShardReplicationWatcher(context.Background(), topoServer, tr, cell, keyspace, shard, refreshInterval, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag)
}
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, ts *discovery.LegacyTabletStats)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
}

// TopologyWatcherInterface defines the public interface that is implemented by
// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out
// go/vt/discovery.LegacyTopologyWatcher.
type TopologyWatcherInterface interface {
WaitForInitialTopology() error
Stop()
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// TxThrottler throttles transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.LegacyHealthCheck to send replication-lag updates to the wrapped throttler.
Expand Down Expand Up @@ -87,10 +137,6 @@ type TxThrottler struct {
requestsThrottled *stats.Counter
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// NewTxThrottler tries to construct a TxThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
Expand All @@ -101,10 +147,8 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
txThrottler, err = newTxThrottler(env, &txThrottlerConfig{enabled: false})
if err != nil {
panic("BUG: Can't create a disabled transaction throttler")
}
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
Expand Down Expand Up @@ -154,28 +198,6 @@ type txThrottlerConfig struct {
healthCheckCells []string
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Throttle(threadID int) time.Duration
ThreadFinished(threadID int)
Close()
MaxRate() int64
SetMaxRate(rate int64)
RecordReplicationLag(time time.Time, ts *discovery.LegacyTabletStats)
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
}

// TopologyWatcherInterface defines the public interface that is implemented by
// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out
// go/vt/discovery.LegacyTopologyWatcher.
type TopologyWatcherInterface interface {
WaitForInitialTopology() error
Stop()
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -187,33 +209,6 @@ type txThrottlerState struct {
topologyWatchers []TopologyWatcherInterface
}

// These vars store the functions used to create the topo server, healthcheck,
// topology watchers and go/vt/throttler. These are provided here so that they can be overridden
// in tests to generate mocks.
type healthCheckFactoryFunc func() discovery.LegacyHealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = discovery.NewLegacyDefaultHealthCheck
topologyWatcherFactory = func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewLegacyShardReplicationWatcher(context.Background(), topoServer, tr, cell, keyspace, shard, refreshInterval, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag)
}
}

func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) {
if config.enabled {
// Verify config.
Expand All @@ -234,7 +229,7 @@ func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler,
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *TxThrottler) Open() error {
func (t *TxThrottler) Open() (err error) {
if !t.config.enabled {
return nil
}
Expand All @@ -243,8 +238,7 @@ func (t *TxThrottler) Open() error {
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
var err error
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard)
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
return err
}

Expand Down Expand Up @@ -272,9 +266,6 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}

// Throttle according to both what the throttle state says, and the priority. Workloads with higher priority
// are less likely to be throttled.
Expand All @@ -287,7 +278,7 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard string,
func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string,
) (*txThrottlerState, error) {
t, err := throttlerFactory(
TxThrottlerName,
Expand Down Expand Up @@ -326,7 +317,8 @@ func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard string,

func (ts *txThrottlerState) throttle() bool {
if ts.throttler == nil {
panic("BUG: throttle called after deallocateResources was called.")
log.Error("throttle called after deallocateResources was called")
return false
}
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
Expand Down
33 changes: 33 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -145,3 +147,34 @@ func TestEnabledThrottler(t *testing.T) {
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

{
// disabled config
throttler, err := newTxThrottler(env, &txThrottlerConfig{enabled: false})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
{
// enabled with invalid throttler config
throttler, err := newTxThrottler(env, &txThrottlerConfig{
enabled: true,
throttlerConfig: &throttlerdatapb.Configuration{},
})
assert.NotNil(t, err)
assert.Nil(t, throttler)
}
{
// enabled
throttler, err := newTxThrottler(env, &txThrottlerConfig{
enabled: true,
healthCheckCells: []string{"cell1"},
throttlerConfig: throttler.DefaultMaxReplicationLagModuleConfig().Configuration,
})
assert.Nil(t, err)
assert.NotNil(t, throttler)
}
}

0 comments on commit 255716f

Please sign in to comment.