Skip to content

Commit

Permalink
txthrottler: verify config at vttablet startup, consolidate funcs
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed May 18, 2023
1 parent 2df867c commit 6eec92f
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 113 deletions.
3 changes: 1 addition & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ Usage of vttablet:
--twopc_abandon_age float time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.
--twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions.
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
--v Level log level for V logs
Expand Down
61 changes: 47 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These constants represent values for various config parameters.
Expand Down Expand Up @@ -89,6 +91,24 @@ var (
txLogHandler = "/debug/txlog"
)

type TxThrottlerConfigFlag struct {
*throttlerdatapb.Configuration
}

func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag {
return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}}
}

func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration {
return t.Configuration
}

func (t *TxThrottlerConfigFlag) Set(arg string) error {
return prototext.Unmarshal([]byte(arg), t)
}

func (t *TxThrottlerConfigFlag) Type() string { return "string" }

// RegisterTabletEnvFlags is a public API to register tabletenv flags for use by test cases that expect
// some flags to be set with default values
func RegisterTabletEnvFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -158,7 +178,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
// Tx throttler config
flagutil.DualFormatBoolVar(fs, &currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(fs, &currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
fs.Var(currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
Expand Down Expand Up @@ -340,7 +360,7 @@ type TabletConfig struct {
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
Expand Down Expand Up @@ -657,9 +677,6 @@ func (c *TabletConfig) Verify() error {
if v := c.HotRowProtection.MaxConcurrency; v <= 0 {
return fmt.Errorf("--hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return fmt.Errorf("--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
return nil
}

Expand Down Expand Up @@ -695,6 +712,22 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error {

// verifyTxThrottlerConfig checks the TxThrottler related config for sanity.
func (c *TabletConfig) verifyTxThrottlerConfig() error {
if !c.EnableTxThrottler {
return nil
}

err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}

if c.TxThrottlerTabletTypes == nil || len(*c.TxThrottlerTabletTypes) == 0 {
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled")
}
Expand All @@ -706,6 +739,7 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported tablet type %q", tabletType)
}
}

return nil
}

Expand Down Expand Up @@ -812,17 +846,16 @@ var defaultConfig = TabletConfig{
EnablePerWorkloadTableMetrics: false,
}

// defaultTxThrottlerConfig formats the default throttlerdata.Configuration
// object in text format. It uses the object returned by
// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its
// fields. It panics on error.
func defaultTxThrottlerConfig() string {
// defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on
// a throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of
// its fields. It panics on error.
func defaultTxThrottlerConfig() *TxThrottlerConfigFlag {
// Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields.
config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
// TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10
// and remove this line.
config.MaxReplicationLagSec = 10
return prototext.Format(config)
return &TxThrottlerConfigFlag{config}
}

func defaultTransactionLimitConfig() TransactionLimitConfig {
Expand Down
84 changes: 78 additions & 6 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/yaml2"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestConfigParse(t *testing.T) {
Expand Down Expand Up @@ -332,31 +334,101 @@ func TestFlags(t *testing.T) {
assert.Equal(t, want, currentConfig)
}

func TestTxThrottlerConfigFlag(t *testing.T) {
f := NewTxThrottlerConfigFlag()
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String())
assert.Equal(t, "string", f.Type())
}
{
defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.NotNil(t, f.Get())
assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec)
}
{
assert.NotNil(t, f.Set("should not parse"))
}
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
// default config (replica)
// default (disabled)
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
}
{
// replica + rdonly (allowed)
// enabled without throttler config
currentConfig.EnableTxThrottler = true
assert.NotNil(t, currentConfig.verifyTxThrottlerConfig())
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
}
{
// enabled without cells defined
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
}
{
// enabled with good config (default/replica tablet type)
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
}
{
// enabled + replica and rdonly tablet types
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
}
{
// no tablet types
// enabled without tablet types
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
}
{
// disallowed tablet type
// enabled + disallowed tablet type
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
}
{
// enabled + disallowed priority
currentConfig.EnableTxThrottler = true
currentConfig.TxThrottlerConfig = &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}
currentConfig.TxThrottlerConfig.TargetReplicationLagSec = 5
currentConfig.TxThrottlerHealthCheckCells = []string{"cell1"}
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}
currentConfig.TxThrottlerDefaultPriority = 12345
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
}
}
74 changes: 22 additions & 52 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package txthrottler

import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/stats"
Expand Down Expand Up @@ -186,64 +184,36 @@ type txThrottlerState struct {
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
txThrottler, err := tryCreateTxThrottler(env, topoServer)
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
// newTxThrottler with disabled config never returns an error
txThrottler, _ = newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
} else {
log.Infof("Initialized transaction throttler with config: %+v", txThrottler.config)
}
return txThrottler
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}
throttlerConfig := &txThrottlerConfig{enabled: false}

if env.Config().EnableTxThrottler {
// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
}

var throttlerConfig throttlerdatapb.Configuration
if err := prototext.Unmarshal([]byte(env.Config().TxThrottlerConfig), &throttlerConfig); err != nil {
return nil, err
defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
}

// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

return newTxThrottler(env, topoServer, &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
throttlerConfig: &throttlerConfig,
healthCheckCells: healthCheckCells,
})
}

func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
if err != nil {
return nil, err
}
if len(config.healthCheckCells) == 0 {
return nil, fmt.Errorf("empty healthCheckCells given. %+v", config)
}
}
return &txThrottler{
config: config,
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}, nil
}
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
Expand Down
Loading

0 comments on commit 6eec92f

Please sign in to comment.