Skip to content

Commit

Permalink
Merge pull request #4736 from planetscale/ss-vrepl
Browse files Browse the repository at this point in the history
vreplication: table copying phase 1: create list of tables to copy
  • Loading branch information
sougou authored Apr 7, 2019
2 parents 256edd7 + eec48b9 commit 1dc74aa
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 191 deletions.
55 changes: 37 additions & 18 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
// BlplTransaction is the key for the stats map.
BlplTransaction = "Transaction"

// VReplicationInit is for the Init state.
VReplicationInit = "Init"
// VReplicationCopying is for the Copying state.
VReplicationCopying = "Copying"
// BlpRunning is for the Running state.
BlpRunning = "Running"
// BlpStopped is for the Stopped state.
Expand Down Expand Up @@ -191,18 +195,18 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
// applyEvents returns a recordable status message on termination or an error otherwise.
func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
// Read starting values for vreplication.
pos, stopPos, maxTPS, maxReplicationLag, err := ReadVRSettings(blp.dbClient, blp.uid)
settings, err := ReadVRSettings(blp.dbClient, blp.uid)
if err != nil {
log.Error(err)
return err
}
blp.position, err = mysql.DecodePosition(pos)
blp.position, err = mysql.DecodePosition(settings.StartPos)
if err != nil {
log.Error(err)
return err
}
if stopPos != "" {
blp.stopPosition, err = mysql.DecodePosition(stopPos)
if settings.StopPos != "" {
blp.stopPosition, err = mysql.DecodePosition(settings.StopPos)
if err != nil {
log.Error(err)
return err
Expand All @@ -212,8 +216,8 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
fmt.Sprintf("BinlogPlayer/%d", blp.uid),
"transactions",
1, /* threadCount */
maxTPS,
maxReplicationLag,
settings.MaxTPS,
settings.MaxReplicationLag,
)
if err != nil {
err := fmt.Errorf("failed to instantiate throttler: %v", err)
Expand Down Expand Up @@ -512,29 +516,44 @@ func SetVReplicationState(dbClient DBClient, uid uint32, state, message string)
return nil
}

// VRSettings contains the settings of a vreplication table.
type VRSettings struct {
StartPos string
StopPos string
MaxTPS int64
MaxReplicationLag int64
State string
}

// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
func ReadVRSettings(dbClient DBClient, uid uint32) (pos, stopPos string, maxTPS, maxReplicationLag int64, err error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=%v", uid)
func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("error %v in selecting vreplication settings %v", err, query)
return VRSettings{}, fmt.Errorf("error %v in selecting vreplication settings %v", err, query)
}

if qr.RowsAffected != 1 {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("checkpoint information not available in db for %v", uid)
return VRSettings{}, fmt.Errorf("checkpoint information not available in db for %v", uid)
}

maxTPS, err = sqltypes.ToInt64(qr.Rows[0][2])
maxTPS, err := sqltypes.ToInt64(qr.Rows[0][2])
if err != nil {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("failed to parse max_tps column: %v", err)
return VRSettings{}, fmt.Errorf("failed to parse max_tps column: %v", err)
}
maxReplicationLag, err = sqltypes.ToInt64(qr.Rows[0][3])
maxReplicationLag, err := sqltypes.ToInt64(qr.Rows[0][3])
if err != nil {
return "", "", throttler.InvalidMaxRate, throttler.InvalidMaxReplicationLag, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
}

return qr.Rows[0][0].ToString(), qr.Rows[0][1].ToString(), maxTPS, maxReplicationLag, nil
return VRSettings{
StartPos: qr.Rows[0][0].ToString(),
StopPos: qr.Rows[0][1].ToString(),
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
State: qr.Rows[0][4].ToString(),
}, nil
}

// CreateVReplication returns a statement to populate the first value into
Expand All @@ -546,12 +565,12 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning)
}

// CreateVReplicationStopped returns a statement to create a stopped vreplication.
func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSource, position string) string {
// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string) string {
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v')",
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), BlpStopped)
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state)
}

// GenerateUpdatePos returns a statement to update a value in the
Expand Down
21 changes: 13 additions & 8 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
sqltypes.NULL, // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
Expand All @@ -51,7 +52,7 @@ var (
func TestNewBinlogPlayerKeyRange(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -82,7 +83,7 @@ func TestNewBinlogPlayerKeyRange(t *testing.T) {
func TestNewBinlogPlayerTables(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -114,7 +115,7 @@ func TestNewBinlogPlayerTables(t *testing.T) {
func TestApplyEventsFail(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, errors.New("err"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error in processing binlog event failed query BEGIN, err: err' where id=1", testDMLResponse, nil)

Expand Down Expand Up @@ -145,10 +146,11 @@ func TestStopPosEqual(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1083"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest(`update _vt.vreplication set state='Stopped', message='not starting BinlogPlayer, we\'re already at the desired position 0-1-1083' where id=1`, testDMLResponse, nil)

_ = newFakeBinlogClient()
Expand Down Expand Up @@ -177,10 +179,11 @@ func TestStopPosLess(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1082"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest(`update _vt.vreplication set state='Stopped', message='starting point 0-1-1083 greater than stopping point 0-1-1082' where id=1`, testDMLResponse, nil)

_ = newFakeBinlogClient()
Expand Down Expand Up @@ -209,10 +212,11 @@ func TestStopPosGreater(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1085"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -245,10 +249,11 @@ func TestContextCancel(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1085"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", posEqual, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand All @@ -275,7 +280,7 @@ func TestContextCancel(t *testing.T) {
func TestRetryOnDeadlock(t *testing.T) {
dbClient := NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
deadlocked := &mysql.SQLError{Num: 1213, Message: "deadlocked"}
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", nil, deadlocked)
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
sqltypes.NULL, // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
Expand All @@ -64,7 +65,7 @@ func TestControllerKeyRange(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestControllerTables(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -191,7 +192,7 @@ func TestControllerOverrides(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -255,10 +256,10 @@ func TestControllerRetry(t *testing.T) {

dbClient := binlogplayer.NewMockDBClient(t)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", nil, errors.New("(expected error)"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error (expected error) in selecting vreplication settings select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", nil, errors.New("(expected error)"))
dbClient.ExpectRequest("update _vt.vreplication set state='Error', message='error (expected error) in selecting vreplication settings select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -298,10 +299,11 @@ func TestControllerStopPosition(t *testing.T) {
sqltypes.NewVarBinary("MariaDB/0-1-1235"), // stop_pos
sqltypes.NewVarBinary("9223372036854775807"), // max_tps
sqltypes.NewVarBinary("9223372036854775807"), // max_replication_lag
sqltypes.NewVarBinary("Running"), // state
},
},
}
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", withStop, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", withStop, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestEngineOpen(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestEngineExec(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestEngineExec(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestCreateDBAndTable(t *testing.T) {
fmt.Sprintf(`1|Running|keyspace:"%s" shard:"0" key_range:<end:"\200" > `, env.KeyspaceName),
), nil)
dbClient.ExpectRequest("update _vt.vreplication set state='Running', message='' where id=1", testDMLResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("select pos, stop_pos, max_tps, max_replication_lag, state from _vt.vreplication where id=1", testSettingsResponse, nil)
dbClient.ExpectRequest("begin", nil, nil)
dbClient.ExpectRequest("insert into t values(1)", testDMLResponse, nil)
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
Expand Down
Loading

0 comments on commit 1dc74aa

Please sign in to comment.