diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index e0f41b84bd9..e9041cca287 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -17,7 +17,7 @@ import ( ) var ( - debug = false // set to true to always use local env vtdataroot for local debugging + debug = false // set to true for local debugging: this uses the local env vtdataroot and does not teardown clusters originalVtdataroot string vtdataroot string @@ -40,6 +40,8 @@ type ClusterConfig struct { tabletPortBase int tabletGrpcPortBase int tabletMysqlPortBase int + + vreplicationCompressGTID bool } // VitessCluster represents all components within the test cluster @@ -219,6 +221,17 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, shard *Shard, tabletType string, tabletID int) (*Tablet, *exec.Cmd, error) { tablet := &Tablet{} + options := []string{ + "-queryserver-config-schema-reload-time", "5", + "-enable-lag-throttler", + "-heartbeat_enable", + "-heartbeat_interval", "250ms", + } //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time" + + if mainClusterConfig.vreplicationCompressGTID { + options = append(options, "-vreplication_store_compressed_gtid=true") + } + vttablet := cluster.VttabletProcessInstance( vc.ClusterConfig.tabletPortBase+tabletID, vc.ClusterConfig.tabletGrpcPortBase+tabletID, @@ -231,12 +244,7 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, vc.Topo.Port, vc.ClusterConfig.hostname, vc.ClusterConfig.tmpDir, - []string{ - "-queryserver-config-schema-reload-time", "5", - "-enable-lag-throttler", - "-heartbeat_enable", - "-heartbeat_interval", "250ms", - }, //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time" + options, false) require.NotNil(t, vttablet) @@ -383,6 +391,9 @@ func (vc *VitessCluster) AddCell(t testing.TB, name string) (*Cell, error) { // TearDown brings down a cluster, deleting processes, removing topo keys func (vc *VitessCluster) TearDown(t testing.TB) { + if debug { + return + } for _, cell := range vc.Cells { for _, vtgate := range cell.Vtgates { if err := vtgate.TearDown(); err != nil { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index f4998cd3d79..3b377b1e316 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -162,7 +162,10 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) { func TestCellAliasVreplicationWorkflow(t *testing.T) { cells := []string{"zone1", "zone2"} - + mainClusterConfig.vreplicationCompressGTID = true + defer func() { + mainClusterConfig.vreplicationCompressGTID = false + }() vc = NewVitessCluster(t, "TestBasicVreplicationWorkflow", cells, mainClusterConfig) require.NotNil(t, vc) allCellNames = "zone1,zone2" diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index eebaa736bfa..4ae6962d09c 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -21,8 +21,11 @@ package binlogplayer import ( "bytes" + "compress/zlib" + "encoding/binary" "encoding/hex" "fmt" + "io" "math" "sync" "time" @@ -470,13 +473,13 @@ func (blp *BinlogPlayer) exec(sql string) (*sqltypes.Result, error) { // transaction_timestamp alone (keeping the old value), and we don't // change SecondsBehindMaster func (blp *BinlogPlayer) writeRecoveryPosition(tx *binlogdatapb.BinlogTransaction) error { - position, err := mysql.DecodePosition(tx.EventToken.Position) + position, err := DecodePosition(tx.EventToken.Position) if err != nil { return err } now := time.Now().Unix() - updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp, blp.blplStats.CopyRowCount.Get()) + updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp, blp.blplStats.CopyRowCount.Get(), false) qr, err := blp.exec(updateRecovery) if err != nil { @@ -557,6 +560,7 @@ var AlterVReplicationTable = []string{ "ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0", } +// WithDDLInitialQueries contains the queries to be expected by the mock db client during tests var WithDDLInitialQueries = []string{ "SELECT db_name FROM _vt.vreplication LIMIT 0", "SELECT rows_copied FROM _vt.vreplication LIMIT 0", @@ -593,7 +597,7 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) { if err != nil { return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err) } - startPos, err := mysql.DecodePosition(vrRow[0].ToString()) + startPos, err := DecodePosition(vrRow[0].ToString()) if err != nil { return VRSettings{}, fmt.Errorf("failed to parse pos column: %v", err) } @@ -630,16 +634,18 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, // GenerateUpdatePos returns a statement to update a value in the // _vt.vreplication table. -func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64) string { +func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64, compress bool) string { + strGTID := encodeString(mysql.EncodePosition(pos)) + if compress { + strGTID = fmt.Sprintf("compress(%s)", strGTID) + } if txTimestamp != 0 { return fmt.Sprintf( "update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, rows_copied=%v, message='' where id=%v", - encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, rowsCopied, uid) + strGTID, timeUpdated, txTimestamp, rowsCopied, uid) } - return fmt.Sprintf( - "update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v", - encodeString(mysql.EncodePosition(pos)), timeUpdated, rowsCopied, uid) + "update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v", strGTID, timeUpdated, rowsCopied, uid) } // GenerateUpdateTime returns a statement to update time_updated in the _vt.vreplication table. @@ -703,6 +709,49 @@ func ReadVReplicationStatus(index uint32) string { return fmt.Sprintf("select pos, state, message from _vt.vreplication where id=%v", index) } +// MysqlUncompress will uncompress a binary string in the format stored by mysql's compress() function +// The first four bytes represent the size of the original string passed to compress() +// Remaining part is the compressed string using zlib, which we uncompress here using golang's zlib library +func MysqlUncompress(input string) []byte { + // consistency check + inputBytes := []byte(input) + if len(inputBytes) < 5 { + return nil + } + + // determine length + dataLength := uint32(inputBytes[0]) + uint32(inputBytes[1])<<8 + uint32(inputBytes[2])<<16 + uint32(inputBytes[3])<<24 + dataLengthBytes := make([]byte, 4) + binary.LittleEndian.PutUint32(dataLengthBytes, dataLength) + dataLength = binary.LittleEndian.Uint32(dataLengthBytes) + + // uncompress using zlib + inputData := inputBytes[4:] + inputDataBuf := bytes.NewBuffer(inputData) + reader, err := zlib.NewReader(inputDataBuf) + if err != nil { + return nil + } + var outputBytes bytes.Buffer + io.Copy(&outputBytes, reader) + if outputBytes.Len() == 0 { + return nil + } + if dataLength != uint32(outputBytes.Len()) { // double check that the stored and uncompressed lengths match + return nil + } + return outputBytes.Bytes() +} + +// DecodePosition attempts to uncompress the passed value first and if it fails tries to decode it as a valid GTID +func DecodePosition(gtid string) (mysql.Position, error) { + b := MysqlUncompress(gtid) + if b != nil { + gtid = string(b) + } + return mysql.DecodePosition(gtid) +} + // StatsHistoryRecord is used to store a Message with timestamp type StatsHistoryRecord struct { Time time.Time diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go index f937734d1dc..c70e9b4dd43 100644 --- a/go/vt/binlog/binlogplayer/binlog_player_test.go +++ b/go/vt/binlog/binlogplayer/binlog_player_test.go @@ -361,7 +361,7 @@ func TestUpdateVReplicationPos(t *testing.T) { "set pos='MariaDB/0-1-8283', time_updated=88822, rows_copied=0, message='' " + "where id=78522" - got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0, 0) + got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0, 0, false) if got != want { t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want) } @@ -373,7 +373,7 @@ func TestUpdateVReplicationTimestamp(t *testing.T) { "set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, rows_copied=0, message='' " + "where id=78522" - got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828, 0) + got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828, 0, false) if got != want { t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index d4a137469a4..a2258a5ce5f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -25,7 +25,6 @@ import ( "sync" "time" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -678,7 +677,7 @@ func (vre *Engine) transitionJournal(je *journalEvent) { // WaitForPos waits for the replication to reach the specified position. func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { start := time.Now() - mPos, err := mysql.DecodePosition(pos) + mPos, err := binlogplayer.DecodePosition(pos) if err != nil { return err } @@ -716,7 +715,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { case len(qr.Rows) > 1 || len(qr.Rows[0]) != 3: return fmt.Errorf("unexpected result: %v", qr) } - current, err := mysql.DecodePosition(qr.Rows[0][0].ToString()) + current, err := binlogplayer.DecodePosition(qr.Rows[0][0].ToString()) if err != nil { return err } @@ -785,7 +784,19 @@ func readRow(dbClient binlogplayer.DBClient, id int) (map[string]string, error) if len(qr.Fields) != len(qr.Rows[0]) { return nil, fmt.Errorf("fields don't match rows: %v", qr) } - return rowToMap(qr, 0) + row, err := rowToMap(qr, 0) + if err != nil { + return nil, err + } + gtid, ok := row["pos"] + if ok { + b := binlogplayer.MysqlUncompress(gtid) + if b != nil { + gtid = string(b) + row["pos"] = gtid + } + } + return row, nil } // rowToMap converts a row into a map for easier processing. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 7d3ebe0aff6..87c3eb56867 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -340,7 +340,7 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp return err } if settings.StartPos.IsZero() { - update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get()) + update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get(), *vreplicationStoreCompressedGTID) _, err := vc.vr.dbClient.Execute(update) return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index ff3f40a11b7..9933d0db7d5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -231,7 +231,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { vp.numAccumulatedHeartbeats = 0 - update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get()) + update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), *vreplicationStoreCompressedGTID) if _, err := vp.vr.dbClient.Execute(update); err != nil { return false, fmt.Errorf("error %v updating position", err) } @@ -428,7 +428,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m stats := NewVrLogStats(event.Type.String()) switch event.Type { case binlogdatapb.VEventType_GTID: - pos, err := mysql.DecodePosition(event.Gtid) + pos, err := binlogplayer.DecodePosition(event.Gtid) if err != nil { return err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 31498ad2a9c..14432f51db3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql" + "github.com/spyzhov/ajson" "github.com/stretchr/testify/require" @@ -252,6 +254,7 @@ func TestCharPK(t *testing.T) { } } } + func TestRollup(t *testing.T) { defer deleteTablet(addTablet(100)) @@ -1597,9 +1600,62 @@ func TestPlayerDDL(t *testing.T) { cancel() } -func TestPlayerStopPos(t *testing.T) { +func TestGTIDCompress(t *testing.T) { + ctx := context.Background() defer deleteTablet(addTablet(100)) + err := env.Mysqld.ExecuteSuperQuery(ctx, "insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state,db_name) values (1, '', '', '', 0,0,0,0,'Stopped','')") + require.NoError(t, err) + type testCase struct { + name, gtid string + compress bool + } + + testCases := []testCase{ + {"cleartext1", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092", false}, + {"cleartext2", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092,320a5e98-6965-11ea-b949-eeafd34ae6e4:1-3,81cbdbf8-6969-11ea-aeb1-a6143b021f67:1-524891956,c9a0f301-6965-11ea-ba9d-02c229065569:1-3,cb698dac-6969-11ea-ac38-16e5d0ac5c3a:1-524441991,e39fca4d-6960-11ea-b4c2-1e895fd49fa0:1-3", false}, + {"compress1", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092", true}, + {"compress2", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092,320a5e98-6965-11ea-b949-eeafd34ae6e4:1-3,81cbdbf8-6969-11ea-aeb1-a6143b021f67:1-524891956,c9a0f301-6965-11ea-ba9d-02c229065569:1-3,cb698dac-6969-11ea-ac38-16e5d0ac5c3a:1-524441991,e39fca4d-6960-11ea-b4c2-1e895fd49fa0:1-3", true}, + {"nil-compress", "", true}, + {"nil-clear", "", false}, + } + for _, tCase := range testCases { + t.Run(tCase.name, func(t *testing.T) { + strGTID := fmt.Sprintf("'%s'", tCase.gtid) + if tCase.compress { + strGTID = fmt.Sprintf("compress(%s)", strGTID) + } + err := env.Mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf("update _vt.vreplication set pos=%s where id = 1", strGTID)) + require.NoError(t, err) + qr, err := env.Mysqld.FetchSuperQuery(ctx, "select pos from _vt.vreplication where id = 1") + require.NoError(t, err) + require.NotNil(t, qr) + require.Equal(t, 1, len(qr.Rows)) + gotGTID := qr.Rows[0][0].ToString() + pos, err := mysql.DecodePosition(gotGTID) + if tCase.compress { + require.True(t, pos.IsZero()) + pos, err = binlogplayer.DecodePosition(gotGTID) + require.NoError(t, err) + require.NotNil(t, pos) + tpos, err := mysql.DecodePosition(tCase.gtid) + require.NoError(t, err) + require.Equal(t, tpos.String(), pos.String()) + } else { + require.NoError(t, err) + require.NotNil(t, pos) + require.Equal(t, tCase.gtid, gotGTID) + } + }) + } +} + +func TestPlayerStopPos(t *testing.T) { + defer deleteTablet(addTablet(100)) + *vreplicationStoreCompressedGTID = true + defer func() { + *vreplicationStoreCompressedGTID = false + }() execStatements(t, []string{ "create table yes(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb), @@ -1652,7 +1708,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", "begin", "insert into yes(id,val) values (1,'aaa')", - fmt.Sprintf("/update.*'%s'", stopPos), + fmt.Sprintf("/update.*compress.*'%s'", stopPos), "/update.*'Stopped'", "commit", }) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 0ce10f41da2..e969c4276ff 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -62,6 +62,8 @@ var ( vreplicationExperimentalFlags = flag.Int64("vreplication_experimental_flags", 0, "(Bitmask) of experimental features in vreplication to enable") vreplicationExperimentalFlagOptimizeInserts int64 = 1 + + vreplicationStoreCompressedGTID = flag.Bool("vreplication_store_compressed_gtid", false, "Store compressed gtids in the pos column of _vt.vreplication") ) // vreplicator provides the core logic to start vreplication streams diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 4bbf23059af..bfa46b2e5dc 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "github.com/golang/protobuf/proto" @@ -590,7 +591,7 @@ func (df *vdiff) stopTargets(ctx context.Context) error { if err := proto.UnmarshalText(row[0].ToString(), &bls); err != nil { return err } - pos, err := mysql.DecodePosition(row[1].ToString()) + pos, err := binlogplayer.DecodePosition(row[1].ToString()) if err != nil { return err } @@ -726,7 +727,7 @@ func (df *vdiff) syncTargets(ctx context.Context, filteredReplicationWaitTime ti if err != nil { return err } - mpos, err := mysql.DecodePosition(pos) + mpos, err := binlogplayer.DecodePosition(pos) if err != nil { return err } diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 35b75ebded2..af786eb3285 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -24,9 +24,13 @@ import ( "sync" "time" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "github.com/golang/protobuf/proto" "k8s.io/apimachinery/pkg/util/sets" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" @@ -395,6 +399,8 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty var id, timeUpdated, transactionTimestamp int64 var state, dbName, pos, stopPos, message string var bls binlogdatapb.BinlogSource + var mpos mysql.Position + id, err = evalengine.ToInt64(row[0]) if err != nil { return nil, "", err @@ -402,7 +408,16 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty if err := proto.UnmarshalText(row[1].ToString(), &bls); err != nil { return nil, "", err } + + // gtid in the pos column can be compressed, so check and possibly uncompress pos = row[2].ToString() + if pos != "" { + mpos, err = binlogplayer.DecodePosition(pos) + if err != nil { + return nil, "", err + } + pos = mpos.String() + } stopPos = row[3].ToString() state = row[5].ToString() dbName = row[6].ToString() diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 5280a0d2893..e92d91ec733 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -32,7 +32,7 @@ import ( "vitess.io/vitess/go/vt/logutil" ) -func TestVExec(t *testing.T) { +func TestVExec2(t *testing.T) { ctx := context.Background() workflow := "wrWorkflow" keyspace := "target" @@ -153,15 +153,15 @@ func TestVExec(t *testing.T) { dryRunResults := []string{ "Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", "will be run on the following streams in keyspace target for workflow wrWorkflow:\n\n", - "+----------------------+----+--------------------------------+---------+-----------+--------------+", - "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID |", - "+----------------------+----+--------------------------------+---------+-----------+--------------+", - "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |", - "| | | filter: > | | | |", - "+----------------------+----+--------------------------------+---------+-----------+--------------+", - "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |", - "| | | filter: > | | | |", - "+----------------------+----+--------------------------------+---------+-----------+--------------+", + `+----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ +| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ +| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | 14b68925-696a-11ea-aee7-fec597a91f5e:1-3 | +| | | filter: > | | | | ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ +| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | 14b68925-696a-11ea-aee7-fec597a91f5e:1-3 | +| | | filter: > | | | | ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+`, } require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String()) } @@ -228,7 +228,7 @@ func TestWorkflowListStreams(t *testing.T) { ] } }, - "Pos": "pos", + "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", "StopPos": "", "State": "Copying", "DBName": "vt_target", @@ -263,7 +263,7 @@ func TestWorkflowListStreams(t *testing.T) { ] } }, - "Pos": "pos", + "Pos": "14b68925-696a-11ea-aee7-fec597a91f5e:1-3", "StopPos": "", "State": "Copying", "DBName": "vt_target", @@ -312,15 +312,15 @@ func TestWorkflowListStreams(t *testing.T) { will be run on the following streams in keyspace target for workflow wrWorkflow: -+----------------------+----+--------------------------------+---------+-----------+--------------+ -| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | -+----------------------+----+--------------------------------+---------+-----------+--------------+ -| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | -| | | filter: > | | | | -+----------------------+----+--------------------------------+---------+-----------+--------------+ -| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | -| | | filter: > | | | | -+----------------------+----+--------------------------------+---------+-----------+--------------+ ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ +| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ +| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | 14b68925-696a-11ea-aee7-fec597a91f5e:1-3 | +| | | filter: > | | | | ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ +| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | 14b68925-696a-11ea-aee7-fec597a91f5e:1-3 | +| | | filter: > | | | | ++----------------------+----+--------------------------------+---------+-----------+------------------------------------------+ diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index 40e7c9a44b5..4742c50088c 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -146,7 +146,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message", "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"), - fmt.Sprintf("1|%v|pos||0|Running|vt_target|%d|0|", bls, timeUpdated), + fmt.Sprintf("1|%v|MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-3||0|Running|vt_target|%d|0|", bls, timeUpdated), ) env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) env.tmc.setVRResults( diff --git a/tools/rowlog/rowlog.go b/tools/rowlog/rowlog.go index e4458005f4d..84117bf0952 100644 --- a/tools/rowlog/rowlog.go +++ b/tools/rowlog/rowlog.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "time" + "vitess.io/vitess/go/vt/binlog/binlogplayer" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" "vitess.io/vitess/go/mysql" @@ -202,11 +203,11 @@ func startStreaming(ctx context.Context, vtgate, vtctld, keyspace, tablet, table //fmt.Printf("stopPos %s\n", stopPos) var err error var currentPosition, stopPosition mysql.Position - currentPosition, err = mysql.DecodePosition(gtid) + currentPosition, err = binlogplayer.DecodePosition(gtid) if err != nil { fmt.Errorf("Error decoding position for %s:%vs\n", gtid, err.Error()) } - stopPosition, err = mysql.DecodePosition(stopPos) + stopPosition, err = binlogplayer.DecodePosition(stopPos) if err != nil { fmt.Errorf("Error decoding position for %s:%vs\n", stopPos, err.Error()) }