Skip to content

Commit

Permalink
Merge pull request #7877 from planetscale/rn-gtid-compression
Browse files Browse the repository at this point in the history
VReplication: ability to compress gtid when stored in _vt.vreplication's pos column
  • Loading branch information
rohit-nayak-ps authored May 8, 2021
2 parents cc9b903 + 3211e7f commit 4578a9b
Show file tree
Hide file tree
Showing 14 changed files with 202 additions and 53 deletions.
25 changes: 18 additions & 7 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

var (
debug = false // set to true to always use local env vtdataroot for local debugging
debug = false // set to true for local debugging: this uses the local env vtdataroot and does not teardown clusters

originalVtdataroot string
vtdataroot string
Expand All @@ -40,6 +40,8 @@ type ClusterConfig struct {
tabletPortBase int
tabletGrpcPortBase int
tabletMysqlPortBase int

vreplicationCompressGTID bool
}

// VitessCluster represents all components within the test cluster
Expand Down Expand Up @@ -219,6 +221,17 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, shard *Shard, tabletType string, tabletID int) (*Tablet, *exec.Cmd, error) {
tablet := &Tablet{}

options := []string{
"-queryserver-config-schema-reload-time", "5",
"-enable-lag-throttler",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
} //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time"

if mainClusterConfig.vreplicationCompressGTID {
options = append(options, "-vreplication_store_compressed_gtid=true")
}

vttablet := cluster.VttabletProcessInstance(
vc.ClusterConfig.tabletPortBase+tabletID,
vc.ClusterConfig.tabletGrpcPortBase+tabletID,
Expand All @@ -231,12 +244,7 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace,
vc.Topo.Port,
vc.ClusterConfig.hostname,
vc.ClusterConfig.tmpDir,
[]string{
"-queryserver-config-schema-reload-time", "5",
"-enable-lag-throttler",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}, //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time"
options,
false)

require.NotNil(t, vttablet)
Expand Down Expand Up @@ -383,6 +391,9 @@ func (vc *VitessCluster) AddCell(t testing.TB, name string) (*Cell, error) {

// TearDown brings down a cluster, deleting processes, removing topo keys
func (vc *VitessCluster) TearDown(t testing.TB) {
if debug {
return
}
for _, cell := range vc.Cells {
for _, vtgate := range cell.Vtgates {
if err := vtgate.TearDown(); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {

func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}

mainClusterConfig.vreplicationCompressGTID = true
defer func() {
mainClusterConfig.vreplicationCompressGTID = false
}()
vc = NewVitessCluster(t, "TestBasicVreplicationWorkflow", cells, mainClusterConfig)
require.NotNil(t, vc)
allCellNames = "zone1,zone2"
Expand Down
65 changes: 57 additions & 8 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ package binlogplayer

import (
"bytes"
"compress/zlib"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"math"
"sync"
"time"
Expand Down Expand Up @@ -470,13 +473,13 @@ func (blp *BinlogPlayer) exec(sql string) (*sqltypes.Result, error) {
// transaction_timestamp alone (keeping the old value), and we don't
// change SecondsBehindMaster
func (blp *BinlogPlayer) writeRecoveryPosition(tx *binlogdatapb.BinlogTransaction) error {
position, err := mysql.DecodePosition(tx.EventToken.Position)
position, err := DecodePosition(tx.EventToken.Position)
if err != nil {
return err
}

now := time.Now().Unix()
updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp, blp.blplStats.CopyRowCount.Get())
updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp, blp.blplStats.CopyRowCount.Get(), false)

qr, err := blp.exec(updateRecovery)
if err != nil {
Expand Down Expand Up @@ -557,6 +560,7 @@ var AlterVReplicationTable = []string{
"ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0",
}

// WithDDLInitialQueries contains the queries to be expected by the mock db client during tests
var WithDDLInitialQueries = []string{
"SELECT db_name FROM _vt.vreplication LIMIT 0",
"SELECT rows_copied FROM _vt.vreplication LIMIT 0",
Expand Down Expand Up @@ -593,7 +597,7 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
}
startPos, err := mysql.DecodePosition(vrRow[0].ToString())
startPos, err := DecodePosition(vrRow[0].ToString())
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse pos column: %v", err)
}
Expand Down Expand Up @@ -630,16 +634,18 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource,

// GenerateUpdatePos returns a statement to update a value in the
// _vt.vreplication table.
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64) string {
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64, compress bool) string {
strGTID := encodeString(mysql.EncodePosition(pos))
if compress {
strGTID = fmt.Sprintf("compress(%s)", strGTID)
}
if txTimestamp != 0 {
return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, rows_copied=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, rowsCopied, uid)
strGTID, timeUpdated, txTimestamp, rowsCopied, uid)
}

return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, rowsCopied, uid)
"update _vt.vreplication set pos=%v, time_updated=%v, rows_copied=%v, message='' where id=%v", strGTID, timeUpdated, rowsCopied, uid)
}

// GenerateUpdateTime returns a statement to update time_updated in the _vt.vreplication table.
Expand Down Expand Up @@ -703,6 +709,49 @@ func ReadVReplicationStatus(index uint32) string {
return fmt.Sprintf("select pos, state, message from _vt.vreplication where id=%v", index)
}

// MysqlUncompress will uncompress a binary string in the format stored by mysql's compress() function
// The first four bytes represent the size of the original string passed to compress()
// Remaining part is the compressed string using zlib, which we uncompress here using golang's zlib library
func MysqlUncompress(input string) []byte {
// consistency check
inputBytes := []byte(input)
if len(inputBytes) < 5 {
return nil
}

// determine length
dataLength := uint32(inputBytes[0]) + uint32(inputBytes[1])<<8 + uint32(inputBytes[2])<<16 + uint32(inputBytes[3])<<24
dataLengthBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(dataLengthBytes, dataLength)
dataLength = binary.LittleEndian.Uint32(dataLengthBytes)

// uncompress using zlib
inputData := inputBytes[4:]
inputDataBuf := bytes.NewBuffer(inputData)
reader, err := zlib.NewReader(inputDataBuf)
if err != nil {
return nil
}
var outputBytes bytes.Buffer
io.Copy(&outputBytes, reader)
if outputBytes.Len() == 0 {
return nil
}
if dataLength != uint32(outputBytes.Len()) { // double check that the stored and uncompressed lengths match
return nil
}
return outputBytes.Bytes()
}

// DecodePosition attempts to uncompress the passed value first and if it fails tries to decode it as a valid GTID
func DecodePosition(gtid string) (mysql.Position, error) {
b := MysqlUncompress(gtid)
if b != nil {
gtid = string(b)
}
return mysql.DecodePosition(gtid)
}

// StatsHistoryRecord is used to store a Message with timestamp
type StatsHistoryRecord struct {
Time time.Time
Expand Down
4 changes: 2 additions & 2 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestUpdateVReplicationPos(t *testing.T) {
"set pos='MariaDB/0-1-8283', time_updated=88822, rows_copied=0, message='' " +
"where id=78522"

got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0, 0)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0, 0, false)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand All @@ -373,7 +373,7 @@ func TestUpdateVReplicationTimestamp(t *testing.T) {
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, rows_copied=0, message='' " +
"where id=78522"

got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828, 0)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828, 0, false)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand Down
19 changes: 15 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down Expand Up @@ -678,7 +677,7 @@ func (vre *Engine) transitionJournal(je *journalEvent) {
// WaitForPos waits for the replication to reach the specified position.
func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error {
start := time.Now()
mPos, err := mysql.DecodePosition(pos)
mPos, err := binlogplayer.DecodePosition(pos)
if err != nil {
return err
}
Expand Down Expand Up @@ -716,7 +715,7 @@ func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error {
case len(qr.Rows) > 1 || len(qr.Rows[0]) != 3:
return fmt.Errorf("unexpected result: %v", qr)
}
current, err := mysql.DecodePosition(qr.Rows[0][0].ToString())
current, err := binlogplayer.DecodePosition(qr.Rows[0][0].ToString())
if err != nil {
return err
}
Expand Down Expand Up @@ -785,7 +784,19 @@ func readRow(dbClient binlogplayer.DBClient, id int) (map[string]string, error)
if len(qr.Fields) != len(qr.Rows[0]) {
return nil, fmt.Errorf("fields don't match rows: %v", qr)
}
return rowToMap(qr, 0)
row, err := rowToMap(qr, 0)
if err != nil {
return nil, err
}
gtid, ok := row["pos"]
if ok {
b := binlogplayer.MysqlUncompress(gtid)
if b != nil {
gtid = string(b)
row["pos"] = gtid
}
}
return row, nil
}

// rowToMap converts a row into a map for easier processing.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (vc *vcopier) fastForward(ctx context.Context, copyState map[string]*sqltyp
return err
}
if settings.StartPos.IsZero() {
update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get())
update := binlogplayer.GenerateUpdatePos(vc.vr.id, pos, time.Now().Unix(), 0, vc.vr.stats.CopyRowCount.Get(), *vreplicationStoreCompressedGTID)
_, err := vc.vr.dbClient.Execute(update)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row

func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get())
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts, vp.vr.stats.CopyRowCount.Get(), *vreplicationStoreCompressedGTID)
if _, err := vp.vr.dbClient.Execute(update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
}
Expand Down Expand Up @@ -428,7 +428,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
stats := NewVrLogStats(event.Type.String())
switch event.Type {
case binlogdatapb.VEventType_GTID:
pos, err := mysql.DecodePosition(event.Gtid)
pos, err := binlogplayer.DecodePosition(event.Gtid)
if err != nil {
return err
}
Expand Down
60 changes: 58 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql"

"github.com/spyzhov/ajson"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -252,6 +254,7 @@ func TestCharPK(t *testing.T) {
}
}
}

func TestRollup(t *testing.T) {
defer deleteTablet(addTablet(100))

Expand Down Expand Up @@ -1597,9 +1600,62 @@ func TestPlayerDDL(t *testing.T) {
cancel()
}

func TestPlayerStopPos(t *testing.T) {
func TestGTIDCompress(t *testing.T) {
ctx := context.Background()
defer deleteTablet(addTablet(100))
err := env.Mysqld.ExecuteSuperQuery(ctx, "insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state,db_name) values (1, '', '', '', 0,0,0,0,'Stopped','')")
require.NoError(t, err)

type testCase struct {
name, gtid string
compress bool
}

testCases := []testCase{
{"cleartext1", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092", false},
{"cleartext2", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092,320a5e98-6965-11ea-b949-eeafd34ae6e4:1-3,81cbdbf8-6969-11ea-aeb1-a6143b021f67:1-524891956,c9a0f301-6965-11ea-ba9d-02c229065569:1-3,cb698dac-6969-11ea-ac38-16e5d0ac5c3a:1-524441991,e39fca4d-6960-11ea-b4c2-1e895fd49fa0:1-3", false},
{"compress1", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092", true},
{"compress2", "MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-308092,320a5e98-6965-11ea-b949-eeafd34ae6e4:1-3,81cbdbf8-6969-11ea-aeb1-a6143b021f67:1-524891956,c9a0f301-6965-11ea-ba9d-02c229065569:1-3,cb698dac-6969-11ea-ac38-16e5d0ac5c3a:1-524441991,e39fca4d-6960-11ea-b4c2-1e895fd49fa0:1-3", true},
{"nil-compress", "", true},
{"nil-clear", "", false},
}
for _, tCase := range testCases {
t.Run(tCase.name, func(t *testing.T) {
strGTID := fmt.Sprintf("'%s'", tCase.gtid)
if tCase.compress {
strGTID = fmt.Sprintf("compress(%s)", strGTID)
}
err := env.Mysqld.ExecuteSuperQuery(ctx, fmt.Sprintf("update _vt.vreplication set pos=%s where id = 1", strGTID))
require.NoError(t, err)
qr, err := env.Mysqld.FetchSuperQuery(ctx, "select pos from _vt.vreplication where id = 1")
require.NoError(t, err)
require.NotNil(t, qr)
require.Equal(t, 1, len(qr.Rows))
gotGTID := qr.Rows[0][0].ToString()
pos, err := mysql.DecodePosition(gotGTID)
if tCase.compress {
require.True(t, pos.IsZero())
pos, err = binlogplayer.DecodePosition(gotGTID)
require.NoError(t, err)
require.NotNil(t, pos)
tpos, err := mysql.DecodePosition(tCase.gtid)
require.NoError(t, err)
require.Equal(t, tpos.String(), pos.String())
} else {
require.NoError(t, err)
require.NotNil(t, pos)
require.Equal(t, tCase.gtid, gotGTID)
}
})
}
}

func TestPlayerStopPos(t *testing.T) {
defer deleteTablet(addTablet(100))
*vreplicationStoreCompressedGTID = true
defer func() {
*vreplicationStoreCompressedGTID = false
}()
execStatements(t, []string{
"create table yes(id int, val varbinary(128), primary key(id))",
fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -1652,7 +1708,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
"begin",
"insert into yes(id,val) values (1,'aaa')",
fmt.Sprintf("/update.*'%s'", stopPos),
fmt.Sprintf("/update.*compress.*'%s'", stopPos),
"/update.*'Stopped'",
"commit",
})
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
vreplicationExperimentalFlags = flag.Int64("vreplication_experimental_flags", 0, "(Bitmask) of experimental features in vreplication to enable")

vreplicationExperimentalFlagOptimizeInserts int64 = 1

vreplicationStoreCompressedGTID = flag.Bool("vreplication_store_compressed_gtid", false, "Store compressed gtids in the pos column of _vt.vreplication")
)

// vreplicator provides the core logic to start vreplication streams
Expand Down
Loading

0 comments on commit 4578a9b

Please sign in to comment.