Skip to content

Commit

Permalink
Add priority support to transaction throttler (vitessio#12662)
Browse files Browse the repository at this point in the history
* Add support for criticality query directive, and have TxThrottler respect that

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Remove unused variable

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix CI pipeline

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comments.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Make linter happy & add extra test cases.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comments.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix circular import

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Make linter happy

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comments:

* Invalid criticality in query directive fails the query.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comments:

* Renamed criticality to priority.
* Change error handling when parsing the priority from string to integer.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Add missing piece of code that got lost during merge conflict resolution

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix vtadmin.js

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comments

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix unit tests (I think)

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Invert polarity of priority values

With this change, queries with PRIORITY=0 never get throttled, whereas those
with PRIORITY=100 always do (provided there is contention).

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Make linter happy

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix flag e2e test

Signed-off-by: Eduardo J. Ortega U <[email protected]>

---------

Signed-off-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
ejortegau authored and timvaillancourt committed Apr 16, 2024
1 parent 618423a commit 9e79931
Show file tree
Hide file tree
Showing 17 changed files with 1,016 additions and 723 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ Usage of vttablet:
--twopc_coordinator_address string address of the (VTGate) process(es) that will be used to notify of abandoned transactions.
--twopc_enable if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.
--tx-throttler-config string Synonym to -tx_throttler_config (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx_throttler_config string The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message (default "target_replication_lag_sec: 2\nmax_replication_lag_sec: 10\ninitial_rate: 100\nmax_increase: 1\nemergency_decrease: 0.5\nmin_duration_between_increases_sec: 40\nmax_duration_between_increases_sec: 62\nmin_duration_between_decreases_sec: 20\nspread_backlog_across_sec: 20\nage_bad_rate_after_sec: 180\nbad_rate_increase: 0.1\nmax_rate_approach_threshold: 0.9\n")
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
Expand Down
1,442 changes: 727 additions & 715 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"strconv"
"strings"
"unicode"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand All @@ -46,8 +49,17 @@ const (
DirectiveVtexplainRunDMLQueries = "EXECUTE_DML_QUERIES"
// DirectiveWorkloadName specifies the name of the client application workload issuing the query.
DirectiveWorkloadName = "WORKLOAD_NAME"
// DirectivePriority specifies the priority of a workload. It should be an integer between 0 and MaxPriorityValue,
// where 0 is the highest priority, and MaxPriorityValue is the lowest one.
DirectivePriority = "PRIORITY"

// MaxPriorityValue specifies the maximum value allowed for the priority query directive. Valid priority values are
// between zero and MaxPriorityValue.
MaxPriorityValue = 100
)

var ErrInvalidPriority = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Invalid priority value specified in query")

func isNonSpace(r rune) bool {
return !unicode.IsSpace(r)
}
Expand Down Expand Up @@ -381,6 +393,29 @@ func AllowScatterDirective(stmt Statement) bool {
return comments != nil && comments.Directives().IsSet(DirectiveAllowScatter)
}

// GetPriorityFromStatement gets the priority from the provided Statement, using DirectivePriority
func GetPriorityFromStatement(statement Statement) (string, error) {
commentedStatement, ok := statement.(Commented)
// This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to
// empty priority
if !ok {
return "", nil
}

directives := commentedStatement.GetParsedComments().Directives()
priority, ok := directives.GetString(DirectivePriority, "")
if !ok || priority == "" {
return "", nil
}

intPriority, err := strconv.Atoi(priority)
if err != nil || intPriority < 0 || intPriority > MaxPriorityValue {
return "", ErrInvalidPriority
}

return priority, nil
}

// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of
// the query directive that specifies it.
func GetWorkloadNameFromStatement(statement Statement) string {
Expand Down
65 changes: 65 additions & 0 deletions go/vt/sqlparser/comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,68 @@ func TestIgnoreMaxMaxMemoryRowsDirective(t *testing.T) {
})
}
}

func TestGetPriorityFromStatement(t *testing.T) {
testCases := []struct {
query string
expectedPriority string
expectedError error
}{
{
query: "select * from a_table",
expectedPriority: "",
expectedError: nil,
},
{
query: "select /*vt+ ANOTHER_DIRECTIVE=324 */ * from another_table",
expectedPriority: "",
expectedError: nil,
},
{
query: "select /*vt+ PRIORITY=33 */ * from another_table",
expectedPriority: "33",
expectedError: nil,
},
{
query: "select /*vt+ PRIORITY=200 */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ PRIORITY=-1 */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ PRIORITY=some_text */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ PRIORITY=0 */ * from another_table",
expectedPriority: "0",
expectedError: nil,
},
{
query: "select /*vt+ PRIORITY=100 */ * from another_table",
expectedPriority: "100",
expectedError: nil,
},
}

for _, testCase := range testCases {
theThestCase := testCase
t.Run(theThestCase.query, func(t *testing.T) {
t.Parallel()
stmt, err := Parse(theThestCase.query)
assert.NoError(t, err)
actualPriority, actualError := GetPriorityFromStatement(stmt)
if theThestCase.expectedError != nil {
assert.ErrorIs(t, actualError, theThestCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, theThestCase.expectedPriority, actualPriority)
}
})
}
}
8 changes: 8 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (t *noopVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) {
panic("implement me")
}

func (t *noopVCursor) SetPriority(string) {
panic("implement me")
}

func (t *noopVCursor) SetTarget(string) error {
panic("implement me")
}
Expand Down Expand Up @@ -681,6 +685,10 @@ func (f *loggingVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion
panic("implement me")
}

func (f *loggingVCursor) SetPriority(string) {
panic("implement me")
}

func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) {
f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl)))
return f.tableRoutes.tbl, nil
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type (
SetWorkload(querypb.ExecuteOptions_Workload)
SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion)
SetWorkloadName(string)
SetPriority(string)
SetFoundRows(uint64)

SetDDLStrategy(string)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,11 @@ func (e *Executor) getPlan(ctx context.Context, vcursor *vcursorImpl, sql string
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt))
priority, err := sqlparser.GetPriorityFromStatement(stmt)
if err != nil {
return nil, err
}
vcursor.SetPriority(priority)

setVarComment, err := prepareSetVarComment(vcursor, stmt)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,48 @@ func TestGetPlanNormalized(t *testing.T) {
assertCacheContains(t, r, want)
}

func TestGetPlanPriority(t *testing.T) {

testCases := []struct {
name string
sql string
expectedPriority string
expectedError error
}{
{name: "Invalid priority", sql: "select /*vt+ PRIORITY=something */ * from music_user_map", expectedPriority: "", expectedError: sqlparser.ErrInvalidPriority},
{name: "Valid priority", sql: "select /*vt+ PRIORITY=33 */ * from music_user_map", expectedPriority: "33", expectedError: nil},
{name: "empty priority", sql: "select * from music_user_map", expectedPriority: "", expectedError: nil},
}

session := NewSafeSession(&vtgatepb.Session{TargetString: "@unknown", Options: &querypb.ExecuteOptions{}})

for _, aTestCase := range testCases {
testCase := aTestCase

t.Run(testCase.name, func(t *testing.T) {
r, _, _, _ := createExecutorEnv()
r.normalize = true
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil)
vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
assert.NoError(t, err)

stmt, err := sqlparser.Parse(testCase.sql)
assert.NoError(t, err)
crticalityFromStatement, _ := sqlparser.GetPriorityFromStatement(stmt)

_, err = r.getPlan(context.Background(), vCursor, testCase.sql, stmt, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, nil, true, logStats)

Check failure on line 1858 in go/vt/vtgate/executor_test.go

View workflow job for this annotation

GitHub Actions / Static Code Checks Etc

too many arguments in call to r.getPlan
if testCase.expectedError != nil {
assert.ErrorIs(t, err, testCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, testCase.expectedPriority, crticalityFromStatement)
assert.Equal(t, testCase.expectedPriority, vCursor.safeSession.Options.Priority)
}
})
}

}

func TestPassthroughDDL(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()
primarySession.TargetString = "TestExecutor"
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,15 @@ func (vc *vcursorImpl) SetPlannerVersion(v plancontext.PlannerVersion) {
vc.safeSession.GetOrCreateOptions().PlannerVersion = v
}

func (vc *vcursorImpl) SetPriority(priority string) {
if priority != "" {
vc.safeSession.GetOrCreateOptions().Priority = priority
} else if vc.safeSession.Options != nil && vc.safeSession.Options.Priority != "" {
vc.safeSession.Options.Priority = ""
}

}

func (vc *vcursorImpl) SetWorkloadName(workloadName string) {
if workloadName != "" {
vc.safeSession.GetOrCreateOptions().WorkloadName = workloadName
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
)

Expand Down Expand Up @@ -136,9 +137,11 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.BoolVar(&currentConfig.TwoPCEnable, "twopc_enable", defaultConfig.TwoPCEnable, "if the flag is on, 2pc is enabled. Other 2pc flags must be supplied.")
fs.StringVar(&currentConfig.TwoPCCoordinatorAddress, "twopc_coordinator_address", defaultConfig.TwoPCCoordinatorAddress, "address of the (VTGate) process(es) that will be used to notify of abandoned transactions.")
SecondsVar(fs, &currentConfig.TwoPCAbandonAge, "twopc_abandon_age", defaultConfig.TwoPCAbandonAge, "time in seconds. Any unresolved transaction older than this time will be sent to the coordinator to be resolved.")
// Tx throttler config
flagutil.DualFormatBoolVar(fs, &currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(fs, &currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message")
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")

fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
fs.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -310,6 +313,7 @@ type TabletConfig struct {
EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -457,6 +461,9 @@ func (c *TabletConfig) Verify() error {
if v := c.HotRowProtection.MaxConcurrency; v <= 0 {
return fmt.Errorf("-hot_row_protection_concurrent_transactions must be > 0 (specified value: %v)", v)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return fmt.Errorf("--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
return nil
}

Expand Down Expand Up @@ -556,6 +563,7 @@ var defaultConfig = TabletConfig{
EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future

Expand Down
16 changes: 15 additions & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,21 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
if tsv.txThrottler.Throttle() {
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
if tsv.txThrottler.Throttle(priority) {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
}
var connSetting *pools.Setting
Expand Down
Loading

0 comments on commit 9e79931

Please sign in to comment.