Skip to content

Commit

Permalink
partial backport (only workflow_type) from VReplication: use strict s…
Browse files Browse the repository at this point in the history
…ql_mode for Online DDL vitessio#9963

Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Apr 23, 2024
1 parent 7625dd6 commit 2ecbf0b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
37 changes: 29 additions & 8 deletions go/vt/vttablet/tabletmanager/vreplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@ import (
"vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
testSettingsResponse = &sqltypes.Result{
Fields: nil,
Fields: []*querypb.Field{
{Name: "pos", Type: sqltypes.VarBinary},
{Name: "stop_pos", Type: sqltypes.VarBinary},
{Name: "max_tps", Type: sqltypes.Int64},
{Name: "max_replication_lag", Type: sqltypes.Int64},
{Name: "state", Type: sqltypes.VarBinary},
{Name: "workflow_type", Type: sqltypes.Int64},
{Name: "workflow", Type: sqltypes.VarChar},
},
InsertID: 0,
Rows: [][]sqltypes.Value{
{
Expand All @@ -45,6 +54,8 @@ var (
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
sqltypes.NewInt64(1), // workflow_type
sqltypes.NewVarChar("wf"), // workflow
},
},
}
Expand All @@ -68,7 +79,7 @@ func TestControllerKeyRange(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -105,7 +116,7 @@ func TestControllerTables(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -199,7 +210,7 @@ func TestControllerOverrides(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -268,11 +279,11 @@ func TestControllerRetry(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", nil, errors.New("(expected error)"))
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", nil, errors.New("(expected error)"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error (expected error) in selecting vreplication settings select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1' where id=1", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -305,7 +316,15 @@ func TestControllerStopPosition(t *testing.T) {
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
withStop := &sqltypes.Result{
Fields: nil,
Fields: []*querypb.Field{
{Name: "pos", Type: sqltypes.VarBinary},
{Name: "stop_pos", Type: sqltypes.VarBinary},
{Name: "max_tps", Type: sqltypes.Int64},
{Name: "max_replication_lag", Type: sqltypes.Int64},
{Name: "state", Type: sqltypes.VarBinary},
{Name: "workflow_type", Type: sqltypes.Int64},
{Name: "workflow", Type: sqltypes.VarChar},
},
InsertID: 0,
Rows: [][]sqltypes.Value{
{
Expand All @@ -314,10 +333,12 @@ func TestControllerStopPosition(t *testing.T) {
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
sqltypes.NewInt64(1), // workflow_type
sqltypes.NewVarChar("wf"), // workflow
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", withStop, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", withStop, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestEngineOpen(t *testing.T) {
), nil)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestEngineExec(t *testing.T) {
), nil)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestEngineExec(t *testing.T) {
), nil)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -499,6 +499,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication MODIFY source.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD KEY.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN rows_copied.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN workflow_type int NOT NULL DEFAULT 0", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.copy_state.*", &sqltypes.Result{}, nil)
}
Expand Down Expand Up @@ -526,7 +527,7 @@ func TestCreateDBAndTable(t *testing.T) {
), nil)
dbClient.ExpectRequestRE("update _vt.vreplication set message='Picked source tablet.*", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down

0 comments on commit 2ecbf0b

Please sign in to comment.