Skip to content

Commit

Permalink
Merge pull request #9030 from planetscale/savepoint-rc
Browse files Browse the repository at this point in the history
Fix savepoint support with reserved connections
  • Loading branch information
harshit-gangal authored Oct 20, 2021
2 parents 89e7288 + 41253c3 commit 27367ea
Show file tree
Hide file tree
Showing 25 changed files with 529 additions and 24,457 deletions.
26 changes: 26 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,29 @@ func TestSQLSelectLimitWithPlanCache(t *testing.T) {
}
}
}

func TestSavepointInReservedConn(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "set session sql_mode = ''")
utils.Exec(t, conn, "BEGIN")
utils.Exec(t, conn, "SAVEPOINT sp_1")
utils.Exec(t, conn, "insert into t7_xxhash(uid, msg) values(1, 'a')")
utils.Exec(t, conn, "RELEASE SAVEPOINT sp_1")
utils.Exec(t, conn, "ROLLBACK")

utils.Exec(t, conn, "set session sql_mode = ''")
utils.Exec(t, conn, "BEGIN")
utils.Exec(t, conn, "SAVEPOINT sp_1")
utils.Exec(t, conn, "RELEASE SAVEPOINT sp_1")
utils.Exec(t, conn, "SAVEPOINT sp_2")
utils.Exec(t, conn, "insert into t7_xxhash(uid, msg) values(2, 'a')")
utils.Exec(t, conn, "RELEASE SAVEPOINT sp_2")
utils.Exec(t, conn, "COMMIT")
defer utils.Exec(t, conn, `delete from t7_xxhash`)
utils.AssertMatches(t, conn, "select uid from t7_xxhash", `[[VARCHAR("2")]]`)
}
345 changes: 178 additions & 167 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,9 @@ func (itc *internalTabletConn) HandlePanic(err *error) {
}

//ReserveBeginExecute is part of the QueryService interface.
func (itc *internalTabletConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
func (itc *internalTabletConn) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, int64, *topodatapb.TabletAlias, error) {
bindVariables = sqltypes.CopyBindVariables(bindVariables)
res, transactionID, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, target, preQueries, sql, bindVariables, options)
res, transactionID, reservedID, alias, err := itc.tablet.qsc.QueryService().ReserveBeginExecute(ctx, target, preQueries, postBeginQueries, sql, bindVariables, options)
return res, transactionID, reservedID, alias, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down
64 changes: 64 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,70 @@ func TestExecutorSavepointInTx(t *testing.T) {
testQueryLog(t, logChan, "TestExecute", "ROLLBACK", "rollback", 2)
}

func TestExecutorSavepointInTxWithReservedConn(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()
logChan := QueryLogger.Subscribe("TestExecutorSavepoint")
defer QueryLogger.Unsubscribe(logChan)

session := NewSafeSession(&vtgatepb.Session{Autocommit: true, TargetString: "TestExecutor", EnableSystemSettings: true})
sbc1.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(sqltypes.MakeTestFields("orig|new", "varchar|varchar"), "a|"),
})
_, err := exec(executor, session, "set sql_mode = ''")
require.NoError(t, err)

_, err = exec(executor, session, "begin")
require.NoError(t, err)
_, err = exec(executor, session, "savepoint a")
require.NoError(t, err)
_, err = exec(executor, session, "select id from user where id = 1")
require.NoError(t, err)
_, err = exec(executor, session, "savepoint b")
require.NoError(t, err)
_, err = exec(executor, session, "release savepoint a")
require.NoError(t, err)
_, err = exec(executor, session, "select id from user where id = 3")
require.NoError(t, err)
_, err = exec(executor, session, "commit")
require.NoError(t, err)
emptyBV := map[string]*querypb.BindVariable{}
sbc1WantQueries := []*querypb.BoundQuery{{
Sql: "select @@sql_mode orig, '' new", BindVariables: emptyBV,
}, {
Sql: "set @@sql_mode = ''", BindVariables: emptyBV,
}, {
Sql: "savepoint a", BindVariables: emptyBV,
}, {
Sql: "select id from `user` where id = 1", BindVariables: emptyBV,
}, {
Sql: "savepoint b", BindVariables: emptyBV,
}, {
Sql: "release savepoint a", BindVariables: emptyBV,
}}

sbc2WantQueries := []*querypb.BoundQuery{{
Sql: "set @@sql_mode = ''", BindVariables: emptyBV,
}, {
Sql: "savepoint a", BindVariables: emptyBV,
}, {
Sql: "savepoint b", BindVariables: emptyBV,
}, {
Sql: "release savepoint a", BindVariables: emptyBV,
}, {
Sql: "select id from `user` where id = 3", BindVariables: emptyBV,
}}
utils.MustMatch(t, sbc1WantQueries, sbc1.Queries, "")
utils.MustMatch(t, sbc2WantQueries, sbc2.Queries, "")
testQueryLog(t, logChan, "TestExecute", "SET", "set session sql_mode = ''", 1)
testQueryLog(t, logChan, "TestExecute", "BEGIN", "begin", 0)
testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint a", 0)
testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 1", 1)
testQueryLog(t, logChan, "TestExecute", "SAVEPOINT", "savepoint b", 1)
testQueryLog(t, logChan, "TestExecute", "RELEASE", "release savepoint a", 1)
testQueryLog(t, logChan, "TestExecute", "SELECT", "select id from user where id = 3", 1)
testQueryLog(t, logChan, "TestExecute", "COMMIT", "commit", 2)
}

func TestExecutorSavepointWithoutTx(t *testing.T) {
executor, sbc1, sbc2, _ := createLegacyExecutorEnv()
logChan := QueryLogger.Subscribe("TestExecutorSavepoint")
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,18 +245,18 @@ func (stc *ScatterConn) ExecuteMultiShard(
})
}
case begin:
innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.Savepoints, queries[i].Sql, queries[i].BindVariables, reservedID, opts)
innerqr, transactionID, alias, err = qs.BeginExecute(ctx, rs.Target, session.GetSavepoints(), queries[i].Sql, queries[i].BindVariables, reservedID, opts)
if err != nil {
retryRequest(func() {
// we seem to have lost our connection. it was a reserved connection, let's try to recreate it
info.actionNeeded = reserveBegin
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts)
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.GetSavepoints(), queries[i].Sql, queries[i].BindVariables, opts)
})
}
case reserve:
innerqr, reservedID, alias, err = qs.ReserveExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, transactionID, opts)
case reserveBegin:
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), queries[i].Sql, queries[i].BindVariables, opts)
innerqr, transactionID, reservedID, alias, err = qs.ReserveBeginExecute(ctx, rs.Target, session.SetPreQueries(), session.GetSavepoints(), queries[i].Sql, queries[i].BindVariables, opts)
default:
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] unexpected actionNeeded on query execution: %v", info.actionNeeded)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestCallProcedureInsideTx(t *testing.T) {

func TestCallProcedureInsideReservedConn(t *testing.T) {
client := framework.NewClient()
_, err := client.ReserveBeginExecute(`call proc_dml()`, nil, nil)
_, err := client.ReserveBeginExecute(`call proc_dml()`, nil, nil, nil)
require.EqualError(t, err, "Transaction state change inside the stored procedure is not allowed (CallerID: dev)")
client.Release()

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/connkilling/connkiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTxKillerKillsTransactionsInReservedConnections(t *testing.T) {
client := framework.NewClient()
defer client.Release()

_, err := client.ReserveBeginExecute("select 42", nil, nil)
_, err := client.ReserveBeginExecute("select 42", nil, nil, nil)
require.NoError(t, err)

assertIsKilledWithin6Seconds(t, client)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,14 @@ func (client *QueryClient) ReserveExecute(query string, preQueries []string, bin
}

// ReserveBeginExecute performs a ReserveBeginExecute.
func (client *QueryClient) ReserveBeginExecute(query string, preQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
func (client *QueryClient) ReserveBeginExecute(query string, preQueries []string, postBeginQueries []string, bindvars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
if client.reservedID != 0 {
return nil, errors.New("already reserved a connection")
}
if client.transactionID != 0 {
return nil, errors.New("already in transaction")
}
qr, transactionID, reservedID, _, err := client.server.ReserveBeginExecute(client.ctx, client.target, preQueries, query, bindvars, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL})
qr, transactionID, reservedID, _, err := client.server.ReserveBeginExecute(client.ctx, client.target, preQueries, postBeginQueries, query, bindvars, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL})
client.transactionID = transactionID
client.reservedID = reservedID
if err != nil {
Expand Down
Loading

0 comments on commit 27367ea

Please sign in to comment.