Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: verify config at vttablet startup, consolidate funcs #13115

Merged
3 changes: 1 addition & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ Usage of vttablet:
--twopc_abandon_age float time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.
--twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions.
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
--v Level log level for V logs
Expand Down
61 changes: 47 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ import (
"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// These constants represent values for various config parameters.
Expand Down Expand Up @@ -89,6 +91,24 @@ var (
txLogHandler = "/debug/txlog"
)

type TxThrottlerConfigFlag struct {
*throttlerdatapb.Configuration
}

func NewTxThrottlerConfigFlag() *TxThrottlerConfigFlag {
return &TxThrottlerConfigFlag{&throttlerdatapb.Configuration{}}
}

func (t *TxThrottlerConfigFlag) Get() *throttlerdatapb.Configuration {
return t.Configuration
}

func (t *TxThrottlerConfigFlag) Set(arg string) error {
return prototext.Unmarshal([]byte(arg), t.Configuration)
}

func (t *TxThrottlerConfigFlag) Type() string { return "string" }

// RegisterTabletEnvFlags is a public API to register tabletenv flags for use by test cases that expect
// some flags to be set with default values
func RegisterTabletEnvFlags(fs *pflag.FlagSet) {
Expand Down Expand Up @@ -158,7 +178,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
// Tx throttler config
flagutil.DualFormatBoolVar(fs, &currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(fs, &currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
fs.Var(currentConfig.TxThrottlerConfig, "tx_throttler_config", "The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message.")
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
Expand Down Expand Up @@ -340,7 +360,7 @@ type TabletConfig struct {
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
Expand Down Expand Up @@ -657,9 +677,6 @@ func (c *TabletConfig) Verify() error {
if v := c.HotRowProtection.MaxConcurrency; v <= 0 {
return fmt.Errorf("--hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to verifyTxThrottlerConfig()

return fmt.Errorf("--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
return nil
}

Expand Down Expand Up @@ -695,6 +712,22 @@ func (c *TabletConfig) verifyTransactionLimitConfig() error {

// verifyTxThrottlerConfig checks the TxThrottler related config for sanity.
func (c *TabletConfig) verifyTxThrottlerConfig() error {
if !c.EnableTxThrottler {
return nil
}

err := throttler.MaxReplicationLagModuleConfig{Configuration: c.TxThrottlerConfig.Get()}.Verify()
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}

if c.TxThrottlerTabletTypes == nil || len(*c.TxThrottlerTabletTypes) == 0 {
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "--tx-throttler-tablet-types must be defined when transaction throttler is enabled")
}
Expand All @@ -706,6 +739,7 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unsupported tablet type %q", tabletType)
}
}

return nil
}

Expand Down Expand Up @@ -812,17 +846,16 @@ var defaultConfig = TabletConfig{
EnablePerWorkloadTableMetrics: false,
}

// defaultTxThrottlerConfig formats the default throttlerdata.Configuration
// object in text format. It uses the object returned by
// throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of its
// fields. It panics on error.
func defaultTxThrottlerConfig() string {
// defaultTxThrottlerConfig returns the default TxThrottlerConfigFlag object based on
// a throttler.DefaultMaxReplicationLagModuleConfig().Configuration and overrides some of
// its fields. It panics on error.
func defaultTxThrottlerConfig() *TxThrottlerConfigFlag {
// Take throttler.DefaultMaxReplicationLagModuleConfig and override some fields.
config := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
// TODO(erez): Make DefaultMaxReplicationLagModuleConfig() return a MaxReplicationLagSec of 10
// and remove this line.
config.MaxReplicationLagSec = 10
return prototext.Format(config)
return &TxThrottlerConfigFlag{config}
}

func defaultTransactionLimitConfig() TransactionLimitConfig {
Expand Down
148 changes: 126 additions & 22 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/yaml2"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

func TestConfigParse(t *testing.T) {
Expand Down Expand Up @@ -332,31 +334,133 @@ func TestFlags(t *testing.T) {
assert.Equal(t, want, currentConfig)
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
func TestTxThrottlerConfigFlag(t *testing.T) {
f := NewTxThrottlerConfigFlag()
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration

{
// default config (replica)
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.Equal(t, defaultMaxReplicationLagModuleConfig.String(), f.String())
assert.Equal(t, "string", f.Type())
}
{
// replica + rdonly (allowed)
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
assert.Nil(t, currentConfig.verifyTxThrottlerConfig())
defaultMaxReplicationLagModuleConfig.TargetReplicationLagSec = 5
assert.Nil(t, f.Set(defaultMaxReplicationLagModuleConfig.String()))
assert.NotNil(t, f.Get())
assert.Equal(t, int64(5), f.Get().TargetReplicationLagSec)
}
{
// no tablet types
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_FAILED_PRECONDITION, vterrors.Code(err))
assert.NotNil(t, f.Set("should not parse"))
}
{
// disallowed tablet type
currentConfig.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED}
err := currentConfig.verifyTxThrottlerConfig()
assert.NotNil(t, err)
assert.Equal(t, vtrpcpb.Code_INVALID_ARGUMENT, vterrors.Code(err))
}

func TestVerifyTxThrottlerConfig(t *testing.T) {
defaultMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
invalidMaxReplicationLagModuleConfig := throttler.DefaultMaxReplicationLagModuleConfig().Configuration
invalidMaxReplicationLagModuleConfig.TargetReplicationLagSec = -1

type testConfig struct {
Name string
ExpectedErrorCode vtrpcpb.Code
//
EnableTxThrottler bool
TxThrottlerConfig *TxThrottlerConfigFlag
TxThrottlerHealthCheckCells []string
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag
TxThrottlerDefaultPriority int
}

tests := []testConfig{
{
// default (disabled)
Name: "default",
EnableTxThrottler: false,
},
{
// enabled with invalid throttler config
Name: "enabled invalid config",
ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig},
},
{
// enabled without cells defined
Name: "enabled without cells",
ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
},
{
// enabled with good config (default/replica tablet type)
Name: "enabled",
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
},
{
// enabled + replica and rdonly tablet types
Name: "enabled plus rdonly",
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
},
},
{
// enabled without tablet types
Name: "enabled without tablet types",
ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{},
},
{
// enabled + disallowed tablet type
Name: "enabled disallowed tablet type",
ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerHealthCheckCells: []string{"cell1"},
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_DRAINED},
},
{
// enabled + disallowed priority
Name: "enabled disallowed priority",
ExpectedErrorCode: vtrpcpb.Code_INVALID_ARGUMENT,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
TxThrottlerDefaultPriority: 12345,
TxThrottlerHealthCheckCells: []string{"cell1"},
},
}

for _, test := range tests {
test := test
t.Run(test.Name, func(t *testing.T) {
t.Parallel()

config := defaultConfig
config.EnableTxThrottler = test.EnableTxThrottler
if test.TxThrottlerConfig == nil {
test.TxThrottlerConfig = NewTxThrottlerConfigFlag()
}
config.TxThrottlerConfig = test.TxThrottlerConfig
config.TxThrottlerHealthCheckCells = test.TxThrottlerHealthCheckCells
config.TxThrottlerDefaultPriority = test.TxThrottlerDefaultPriority
if test.TxThrottlerTabletTypes != nil {
config.TxThrottlerTabletTypes = test.TxThrottlerTabletTypes
}

err := config.verifyTxThrottlerConfig()
if test.ExpectedErrorCode == vtrpcpb.Code_OK {
assert.Nil(t, err)
} else {
assert.NotNil(t, err)
assert.Equal(t, test.ExpectedErrorCode, vterrors.Code(err))
}
})
}
}
Loading