Skip to content

Commit

Permalink
Do not send field query when using reserved connection (#10163)
Browse files Browse the repository at this point in the history
* Do not send field query when using reserved connection

Signed-off-by: Florent Poinsard <[email protected]>

* Fixed the expected output of tabletserver unit tests

Signed-off-by: Florent Poinsard <[email protected]>

* Addition of an end-to-end benchmark for a select in a reserved connectio

Signed-off-by: Florent Poinsard <[email protected]>

* Reuse reserved connection hen sending field query

Signed-off-by: Florent Poinsard <[email protected]>

* Fail queries when setting unsupported sql_mode

Signed-off-by: Florent Poinsard <[email protected]>

* error out for unsupported sql mode only when the mode is changed

Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui authored May 11, 2022
1 parent 76bf8b8 commit 6c4bc21
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 32 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// AssertMatches ensures the given query produces the expected results.
func AssertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
func AssertMatches(t testing.TB, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := Exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
Expand Down Expand Up @@ -99,7 +99,7 @@ func AssertResultIsEmpty(t *testing.T, conn *mysql.Conn, pre string) {

// Exec executes the given query using the given connection. The results are returned.
// The test fails if the query produces an error.
func Exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
func Exec(t testing.TB, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
require.NoError(t, err, "for query: "+query)
Expand Down
21 changes: 21 additions & 0 deletions go/test/endtoend/vtgate/reservedconn/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,27 @@ func TestSetSystemVarInTxWithConnError(t *testing.T) {
utils.AssertMatches(t, conn, "select id, @@sql_safe_updates from test where id = 4", "[[INT64(4) INT64(1)]]")
}

func BenchmarkReservedConnFieldQuery(b *testing.B) {
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(b, err)
defer conn.Close()

utils.Exec(b, conn, "delete from test")
utils.Exec(b, conn, "insert into test (id, val1) values (1, 'toto'), (4, 'tata')")

// set sql_mode to empty to force the use of reserved connection
utils.Exec(b, conn, "set sql_mode = ''")
utils.AssertMatches(b, conn, "select @@sql_mode", `[[VARCHAR("")]]`)

for i := 0; i < b.N; i++ {
utils.Exec(b, conn, "select id, val1 from test")
}
}

func TestEnableSystemSettings(t *testing.T) {
vtParams := mysql.ConnParams{
Host: "localhost",
Expand Down
25 changes: 20 additions & 5 deletions go/vt/vtgate/engine/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ type (
}
)

var unsupportedSQLModes = []string{"ANSI_QUOTES", "NO_BACKSLASH_ESCAPES", "PIPES_AS_CONCAT", "REAL_AS_FLOAT"}

var _ Primitive = (*Set)(nil)

// RouteType implements the Primitive interface method.
Expand Down Expand Up @@ -350,7 +352,10 @@ func (svs *SysVarReservedConn) checkAndUpdateSysVar(vcursor VCursor, res *evalen

var value sqltypes.Value
if svs.Name == "sql_mode" {
changed, value = sqlModeChangedValue(qr)
changed, value, err = sqlModeChangedValue(qr)
if err != nil {
return false, err
}
if !changed {
return false, nil
}
Expand All @@ -371,12 +376,12 @@ func (svs *SysVarReservedConn) checkAndUpdateSysVar(vcursor VCursor, res *evalen
return false, nil
}

func sqlModeChangedValue(qr *sqltypes.Result) (bool, sqltypes.Value) {
func sqlModeChangedValue(qr *sqltypes.Result) (bool, sqltypes.Value, error) {
if len(qr.Fields) != 2 {
return false, sqltypes.Value{}
return false, sqltypes.Value{}, nil
}
if len(qr.Rows[0]) != 2 {
return false, sqltypes.Value{}
return false, sqltypes.Value{}, nil
}
orig := qr.Rows[0][0].ToString()
newVal := qr.Rows[0][1].ToString()
Expand All @@ -393,8 +398,15 @@ func sqlModeChangedValue(qr *sqltypes.Result) (bool, sqltypes.Value) {

changed := false
newValArr := strings.Split(newVal, ",")
unsupportedMode := ""
for _, nVal := range newValArr {
nVal = strings.ToUpper(nVal)
for _, mode := range unsupportedSQLModes {
if mode == nVal {
unsupportedMode = nVal
break
}
}
notSeen, exists := origMap[nVal]
if !exists {
changed = true
Expand All @@ -409,8 +421,11 @@ func sqlModeChangedValue(qr *sqltypes.Result) (bool, sqltypes.Value) {
if !changed && uniqOrigVal != origValSeen {
changed = true
}
if changed && unsupportedMode != "" {
return false, sqltypes.Value{}, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "setting the %s sql_mode is unsupported", unsupportedMode)
}

return changed, qr.Rows[0][1]
return changed, qr.Rows[0][1], nil
}

var _ SetOp = (*SysVarSetAware)(nil)
Expand Down
20 changes: 20 additions & 0 deletions go/vt/vtgate/engine/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,26 @@ func TestSetTable(t *testing.T) {
"|a",
)},
disableSetVar: true,
}, {
testName: "sql_mode set an unsupported mode",
mysqlVersion: "80000",
setOps: []SetOp{
&SysVarReservedConn{
Name: "sql_mode",
Keyspace: &vindexes.Keyspace{Name: "ks", Sharded: true},
Expr: "'REAL_AS_FLOAT'",
SupportSetVar: true,
},
},
expectedQueryLog: []string{
`ResolveDestinations ks [] Destinations:DestinationKeyspaceID(00)`,
`ExecuteMultiShard ks.-20: select @@sql_mode orig, 'REAL_AS_FLOAT' new {} false false`,
},
expectedError: "setting the REAL_AS_FLOAT sql_mode is unsupported",
qr: []*sqltypes.Result{sqltypes.MakeTestResult(sqltypes.MakeTestFields("orig|new", "varchar|varchar"),
"|REAL_AS_FLOAT",
)},
disableSetVar: true,
}, {
testName: "default_week_format change - empty orig - MySQL80",
mysqlVersion: "80000",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ func FuzzGetPlan(data []byte) int {
qe.SetQueryPlanCacheCap(1024)

// Call target
_, _ = qe.GetPlan(context.Background(), logStats, query2, true, false)
_, _ = qe.GetPlan(context.Background(), logStats, query2, true, 0, nil)
return 1
}
35 changes: 26 additions & 9 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/sqltypes"

"context"

"vitess.io/vitess/go/acl"
Expand All @@ -50,7 +52,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
)

//_______________________________________________
// _______________________________________________

// TabletPlan wraps the planbuilder's exec plan to enforce additional rules
// and track stats.
Expand Down Expand Up @@ -98,7 +100,7 @@ func (ep *TabletPlan) buildAuthorized() {
}
}

//_______________________________________________
// _______________________________________________

// QueryEngine implements the core functionality of tabletserver.
// It assumes that no requests will be sent to it before Open is
Expand Down Expand Up @@ -287,7 +289,7 @@ func (qe *QueryEngine) Close() {
}

// GetPlan returns the TabletPlan that for the query. Plans are cached in a cache.LRUCache.
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool, isReservedConn bool) (*TabletPlan, error) {
func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats, sql string, skipQueryPlanCache bool, reservedConnID int64, te *TxEngine) (*TabletPlan, error) {
span, ctx := trace.NewSpan(ctx, "QueryEngine.GetPlan")
defer span.Finish()

Expand All @@ -308,7 +310,7 @@ func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats
if err != nil {
return nil, err
}
splan, err := planbuilder.Build(statement, qe.tables, isReservedConn, qe.env.Config().DB.DBName)
splan, err := planbuilder.Build(statement, qe.tables, reservedConnID != 0, qe.env.Config().DB.DBName)
if err != nil {
return nil, err
}
Expand All @@ -317,15 +319,30 @@ func (qe *QueryEngine) GetPlan(ctx context.Context, logStats *tabletenv.LogStats
plan.buildAuthorized()
if plan.PlanID.IsSelect() {
if !skipQueryPlanCache && qe.enableQueryPlanFieldCaching && plan.FieldQuery != nil {
conn, err := qe.conns.Get(ctx)
if err != nil {
return nil, err
var statefulConn *StatefulConnection
var conn *connpool.DBConn
if reservedConnID != 0 {
statefulConn, err = te.txPool.GetAndLock(reservedConnID, "")
if err != nil {
return nil, err
}
defer statefulConn.Unlock()
} else {
conn, err = qe.conns.Get(ctx)
if err != nil {
return nil, err
}
defer conn.Recycle()
}
defer conn.Recycle()

sql := plan.FieldQuery.Query
start := time.Now()
r, err := conn.Exec(ctx, sql, 1, true)
var r *sqltypes.Result
if reservedConnID != 0 {
r, err = statefulConn.Exec(ctx, sql, 1, true)
} else {
r, err = conn.Exec(ctx, sql, 1, true)
}
logStats.AddRewrittenSQL(sql, start)
if err != nil {
return nil, err
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletserver/query_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) {

ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
_, err := qe.GetPlan(ctx, logStats, "", false, false /* inReservedConn */)
_, err := qe.GetPlan(ctx, logStats, "", false, 0, nil)
require.EqualError(t, err, "query was empty")
}

Expand Down Expand Up @@ -192,14 +192,14 @@ func TestQueryPlanCache(t *testing.T) {
} else {
qe.SetQueryPlanCacheCap(1)
}
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false, false /* inReservedConn */)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false, 0, nil)
if err != nil {
t.Fatal(err)
}
if firstPlan == nil {
t.Fatalf("plan should not be nil")
}
secondPlan, err := qe.GetPlan(ctx, logStats, secondQuery, false, false /* inReservedConn */)
secondPlan, err := qe.GetPlan(ctx, logStats, secondQuery, false, 0, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestNoQueryPlanCache(t *testing.T) {
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.SetQueryPlanCacheCap(1024)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, true, false /* inReservedConn */)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, true, 0, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestNoQueryPlanCacheDirective(t *testing.T) {
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.SetQueryPlanCacheCap(1024)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false, false /* inReservedConn */)
firstPlan, err := qe.GetPlan(ctx, logStats, firstQuery, false, 0, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -283,7 +283,7 @@ func TestStatsURL(t *testing.T) {
// warm up cache
ctx := context.Background()
logStats := tabletenv.NewLogStats(ctx, "GetPlanStats")
qe.GetPlan(ctx, logStats, query, false, false /* inReservedConn */)
qe.GetPlan(ctx, logStats, query, false, 0, nil)

request, _ := http.NewRequest("GET", "/debug/tablet_plans", nil)
response := httptest.NewRecorder()
Expand Down Expand Up @@ -388,7 +388,7 @@ func BenchmarkPlanCacheThroughput(b *testing.B) {

for i := 0; i < b.N; i++ {
query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.Intn(500))
_, err := qe.GetPlan(ctx, logStats, query, false, false /* inReservedConn */)
_, err := qe.GetPlan(ctx, logStats, query, false, 0, nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -419,7 +419,7 @@ func benchmarkPlanCache(b *testing.B, db *fakesqldb.DB, lfu bool, par int) {

for pb.Next() {
query := fmt.Sprintf("SELECT (a, b, c) FROM test_table_%d", rand.Intn(500))
_, err := qe.GetPlan(ctx, logStats, query, false, false /* inReservedConn */)
_, err := qe.GetPlan(ctx, logStats, query, false, 0, nil)
require.NoErrorf(b, err, "bad query: %s", query)
}
})
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestPlanCachePollution(t *testing.T) {
query := sample()

start := time.Now()
_, err := qe.GetPlan(ctx, logStats, query, false, false /* inReservedConn */)
_, err := qe.GetPlan(ctx, logStats, query, false, 0, nil)
require.NoErrorf(t, err, "bad query: %s", query)
stats.interval += time.Since(start)

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func newTransaction(tsv *TabletServer, options *querypb.ExecuteOptions) int64 {

func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor {
logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutor")
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false, false /* inReservedConn */)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false, 0, nil)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq
bindVariables = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
plan, err := tsv.qe.GetPlan(ctx, logStats, query, skipQueryPlanCache(options), reservedID != 0)
plan, err := tsv.qe.GetPlan(ctx, logStats, query, skipQueryPlanCache(options), reservedID, tsv.te)
if err != nil {
return err
}
Expand Down Expand Up @@ -934,7 +934,7 @@ func (tsv *TabletServer) beginWaitForSameRangeTransactions(ctx context.Context,
func (tsv *TabletServer) computeTxSerializerKey(ctx context.Context, logStats *tabletenv.LogStats, sql string, bindVariables map[string]*querypb.BindVariable) (string, string) {
// Strip trailing comments so we don't pollute the query cache.
sql, _ = sqlparser.SplitMarginComments(sql)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false /* skipQueryPlanCache */, false /* isReservedConn */)
plan, err := tsv.qe.GetPlan(ctx, logStats, sql, false, 0, nil)
if err != nil {
logComputeRowSerializerKey.Errorf("failed to get plan for query: %v err: %v", sql, err)
return "", ""
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,6 @@ func TestReserveBeginExecute(t *testing.T) {
expected := []string{
"select 43",
"begin",
"select 42 from dual where 1 != 1",
"select 42 from dual limit 10001",
}
splitOutput := strings.Split(db.QueryLog(), ";")
Expand All @@ -1798,7 +1797,6 @@ func TestReserveExecute_WithoutTx(t *testing.T) {
assert.NotEqual(t, int64(0), reservedID, "reservedID should not be zero")
expected := []string{
"select 43",
"select 42 from dual where 1 != 1",
"select 42 from dual limit 10001",
}
splitOutput := strings.Split(db.QueryLog(), ";")
Expand Down Expand Up @@ -1826,7 +1824,6 @@ func TestReserveExecute_WithTx(t *testing.T) {
assert.Equal(t, transactionID, reservedID, "reservedID should be equal to transactionID")
expected := []string{
"select 43",
"select 42 from dual where 1 != 1",
"select 42 from dual limit 10001",
}
splitOutput := strings.Split(db.QueryLog(), ";")
Expand Down

0 comments on commit 6c4bc21

Please sign in to comment.