Skip to content

Commit

Permalink
Fix transaction throttler ignoring the initial rate (vitessio#12618)
Browse files Browse the repository at this point in the history
* Fix transaction throttler ignoring the initial rate

This addresses the issue reported in vitessio#12549

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Add missing override of max replication lag in `throttler.newThrottler()`

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Reorder functions to make diff easier to read

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix check for maxRate in `newThrottlerFromConfig()`

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix some CI pipeline issues

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comment.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix typo

Signed-off-by: Eduardo J. Ortega U <[email protected]>

---------

Signed-off-by: Eduardo J. Ortega U <[email protected]>
Signed-off-by: Eduardo J. Ortega U. <[email protected]>
  • Loading branch information
ejortegau committed Jun 6, 2023
1 parent b14c77e commit 1630982
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
26 changes: 19 additions & 7 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,31 @@ func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag
return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now)
}

func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) {
return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc)
}

func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) {
// Verify input parameters.
if maxRate < 0 {
return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRate)
config := NewMaxReplicationLagModuleConfig(maxReplicationLag)
config.MaxReplicationLagSec = maxReplicationLag

return newThrottlerFromConfig(manager, name, unit, threadCount, maxRate, config, nowFunc)

}

func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) {
err := maxReplicationLagModuleConfig.Verify()
if err != nil {
return nil, fmt.Errorf("invalid max replication lag config: %w", err)
}
if maxReplicationLag < 0 {
return nil, fmt.Errorf("maxReplicationLag must be >= 0: %v", maxReplicationLag)
if maxRateModuleMaxRate < 0 {
return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRateModuleMaxRate)
}

// Enable the configured modules.
maxRateModule := NewMaxRateModule(maxRate)
maxRateModule := NewMaxRateModule(maxRateModuleMaxRate)
actualRateHistory := newAggregatedIntervalHistory(1024, 1*time.Second, threadCount)
maxReplicationLagModule, err := NewMaxReplicationLagModule(NewMaxReplicationLagModuleConfig(maxReplicationLag), actualRateHistory, nowFunc)
maxReplicationLagModule, err := NewMaxReplicationLagModule(maxReplicationLagModuleConfig, actualRateHistory, nowFunc)
if err != nil {
return nil, err
}
Expand Down
24 changes: 13 additions & 11 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ import (
// in tests to generate mocks.
type healthCheckFactoryFunc func() discovery.LegacyHealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error)
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
topologyWatcherFactory topologyWatcherFactoryFunc
throttlerFactory throttlerFactoryFunc
)

func init() {
resetTxThrottlerFactories()
}

func resetTxThrottlerFactories() {
healthCheckFactory = discovery.NewLegacyDefaultHealthCheck
topologyWatcherFactory = func(topoServer *topo.Server, tr discovery.LegacyTabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewLegacyShardReplicationWatcher(context.Background(), topoServer, tr, cell, keyspace, shard, refreshInterval, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag)
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

func init() {
resetTxThrottlerFactories()
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Expand Down Expand Up @@ -238,7 +238,7 @@ func (t *TxThrottler) Open() (err error) {
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard)
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
return err
}

Expand Down Expand Up @@ -278,14 +278,16 @@ func (t *TxThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard string,
) (*txThrottlerState, error) {
func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
TxThrottlerName,
"TPS", /* unit */
1, /* threadCount */
throttler.MaxRateModuleDisabled, /* maxRate */
config.throttlerConfig.MaxReplicationLagSec /* maxReplicationLag */)
maxReplicationLagModuleConfig,
)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestEnabledThrottler(t *testing.T) {
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
assert.Equal(t, 1, threadCount)
return mockThrottler, nil
}
Expand Down

0 comments on commit 1630982

Please sign in to comment.