Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix savepoint support with reserved connections #9030

Merged
merged 7 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,29 @@ func TestSQLSelectLimitWithPlanCache(t *testing.T) {
}
}
}

func TestSavepointInReservedConn(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "set session sql_mode = ''")
utils.Exec(t, conn, "BEGIN")
utils.Exec(t, conn, "SAVEPOINT sp_1")
utils.Exec(t, conn, "insert into t7_xxhash(uid, msg) values(1, 'a')")
utils.Exec(t, conn, "RELEASE SAVEPOINT sp_1")
utils.Exec(t, conn, "ROLLBACK")

utils.Exec(t, conn, "set session sql_mode = ''")
utils.Exec(t, conn, "BEGIN")
utils.Exec(t, conn, "SAVEPOINT sp_1")
utils.Exec(t, conn, "RELEASE SAVEPOINT sp_1")
utils.Exec(t, conn, "SAVEPOINT sp_2")
utils.Exec(t, conn, "insert into t7_xxhash(uid, msg) values(2, 'a')")
utils.Exec(t, conn, "RELEASE SAVEPOINT sp_2")
utils.Exec(t, conn, "COMMIT")
defer utils.Exec(t, conn, `delete from t7_xxhash`)
utils.AssertMatches(t, conn, "select uid from t7_xxhash", `[[VARCHAR("2")]]`)
}
345 changes: 178 additions & 167 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,9 @@ func (itc *internalTabletConn) HandlePanic(err *error) {
}

//ReserveBeginExecute is part of the QueryService interface.
func (itc *internalTabletConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (itc *internalTabletConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
res, transactionID, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, target, preQueries, sql, bindVariables, options)
res, transactionID, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options)
return res, transactionID, reservedID, alias, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down
64 changes: 64 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,70 @@ func TestExecutorSavepointInTx(t *testing.T) {
testQueryLog(t, logChan, "TestExecute", "ROLLBACK", "rollback", 2)
}

func TestExecutorSavepointInTxWithReservedConn(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()
logChan := QueryLogger.Subscribe("TestExecutorSavepoint")
defer QueryLogger.Unsubscribe(logChan)

session := NewSafeSession(&vtgatepb.Session{Autocommit: true, TargetString: "TestExecutor", EnableSystemSettings: true})
sbc1.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("orig|new", "varchar|varchar"), "a|"),
})
_, err := exec(executor, session, "set sql_mode = ''")
require.NoError(t, err)

_, err = exec(executor, session, "begin")
require.NoError(t, err)
_, err = exec(executor, session, "savepoint a")
require.NoError(t, err)
_, err = exec(executor, session, "select id from user where id = 1")
require.NoError(t, err)
_, err = exec(executor, session, "savepoint b")
require.NoError(t, err)
_, err = exec(executor, session, "release savepoint a")
require.NoError(t, err)
_, err = exec(executor, session, "select id from user where id = 3")
require.NoError(t, err)
_, err = exec(executor, session, "commit")
require.NoError(t, err)
emptyBV := map[string]*querypb.BindVariable{}
sbc1WantQueries := []*querypb.BoundQuery{{
Sql: "select @@sql_mode orig, '' new", BindVariables: emptyBV,
}, {
Sql: "set @@sql_mode = ''", BindVariables: emptyBV,
}, {
Sql: "savepoint a", BindVariables: emptyBV,
}, {
Sql: "select id from `user` where id = 1", BindVariables: emptyBV,
}, {
Sql: "savepoint b", BindVariables: emptyBV,
}, {
Sql: "release savepoint a", BindVariables: emptyBV,
}}

sbc2WantQueries := []*querypb.BoundQuery{{
Sql: "set @@sql_mode = ''", BindVariables: emptyBV,
}, {
Sql: "savepoint a", BindVariables: emptyBV,
}, {
Sql: "savepoint b", BindVariables: emptyBV,
}, {
Sql: "release savepoint a", BindVariables: emptyBV,
}, {
Sql: "select id from `user` where id = 3", BindVariables: emptyBV,
}}
utils.MustMatch(t, sbc1WantQueries, sbc1.Queries, "")
utils.MustMatch(t, sbc2WantQueries, sbc2.Queries, "")
testQueryLog(t, logChan, "TestExecute", "SET", "set session sql_mode = ''", 1)
testQueryLog(t, logChan, "TestExecute", "BEGIN", "begin", 0)
testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint a", 0)
testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 1", 1)
testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint b", 1)
testQueryLog(t, logChan, "TestExecute", "RELEASE", "release savepoint a", 1)
testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 3", 1)
testQueryLog(t, logChan, "TestExecute", "COMMIT", "commit", 2)
}

func TestExecutorSavepointWithoutTx(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
logChan := QueryLogger.Subscribe("TestExecutorSavepoint")
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,18 @@ func (stc *ScatterConn) ExecuteMultiShard(
})
}
case begin:
innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, reservedID, opts)
innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.GetSavepoints(), queries[i].Sql, queries[i].BindVariables, reservedID, opts)
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserveBegin
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts)
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.GetSavepoints(), queries[i].Sql, queries[i].BindVariables, opts)
})
}
case reserve:
innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts)
case reserveBegin:
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts)
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.GetSavepoints(), queries[i].Sql, queries[i].BindVariables, opts)
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (client *QueryClient) ReserveBeginExecute(query string, preQueries []string
if client.transactionID != 0 {
return nil, errors.New("already in transaction")
}
qr, transactionID, reservedID, _, err := client.server.ReserveBeginExecute(client.ctx, client.target, preQueries, query, bindvars, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL})
qr, transactionID, reservedID, _, err := client.server.ReserveBeginExecute(client.ctx, client.target, preQueries, nil, query, bindvars, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL})
client.transactionID = transactionID
client.reservedID = reservedID
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (q *query) ReserveBeginExecute(ctx context.Context, request *querypb.Reserv
request.EffectiveCallerId,
request.ImmediateCallerId,
)
result, transactionID, reservedID, alias, err := q.server.ReserveBeginExecute(ctx, request.Target, request.PreQueries, request.Query.Sql, request.Query.BindVariables, request.Options)
result, transactionID, reservedID, alias, err := q.server.ReserveBeginExecute(ctx, request.Target, request.PreQueries, request.PostBeginQueries, request.Query.Sql, request.Query.BindVariables, request.Options)
if err != nil {
// if we have a valid reservedID, return the error in-band
if reservedID != 0 {
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (conn *gRPCQueryClient) HandlePanic(err *error) {
}

//ReserveBeginExecute implements the queryservice interface
func (conn *gRPCQueryClient) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (conn *gRPCQueryClient) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
Expand All @@ -749,6 +749,7 @@ func (conn *gRPCQueryClient) ReserveBeginExecute(ctx context.Context, target *qu
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
Options: options,
PreQueries: preQueries,
PostBeginQueries: postBeginQueries,
Query: &querypb.BoundQuery{
Sql: sql,
BindVariables: bindVariables,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/grpctabletconn/conn_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (b *BenchmarkService) HandlePanic(err *error) {
}
}

func (b *BenchmarkService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (b *BenchmarkService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
panic("should not be called")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type QueryService interface {
// HandlePanic will be called if any of the functions panic.
HandlePanic(err *error)

ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error)
ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error)

ReserveExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error)

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ func (ws *wrappedService) HandlePanic(err *error) {
// No-op. Wrappers must call HandlePanic.
}

func (ws *wrappedService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (ws *wrappedService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
var res *sqltypes.Result
var transactionID, reservedID int64
var alias *topodatapb.TabletAlias
err := ws.wrapper(ctx, target, ws.impl, "ReserveBeginExecute", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var err error
res, transactionID, reservedID, alias, err = conn.ReserveBeginExecute(ctx, target, preQueries, sql, bindVariables, options)
res, transactionID, reservedID, alias, err = conn.ReserveBeginExecute(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options)
return canRetry(ctx, err), err
})

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,9 @@ func (sbc *SandboxConn) HandlePanic(err *error) {
}

//ReserveBeginExecute implements the QueryService interface
func (sbc *SandboxConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (sbc *SandboxConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
reservedID := sbc.reserve(ctx, target, preQueries, bindVariables, 0, options)
result, transactionID, alias, err := sbc.BeginExecute(ctx, target, nil, sql, bindVariables, reservedID, options)
result, transactionID, alias, err := sbc.BeginExecute(ctx, target, postBeginQueries, sql, bindVariables, reservedID, options)
if transactionID != 0 {
sbc.setTxReservedID(transactionID, reservedID)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias, _ *que
}

// ReserveBeginExecute satisfies the Gateway interface
func (f *FakeQueryService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (f *FakeQueryService) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
panic("implement me")
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ func (tsv *TabletServer) VStreamResults(ctx context.Context, target *querypb.Tar
}

//ReserveBeginExecute implements the QueryService interface
func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {

var connID int64
var err error
Expand All @@ -1157,7 +1157,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
connID, err = tsv.te.ReserveBegin(ctx, options, preQueries)
connID, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
}
Expand Down
Loading