Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply vcopy patch 11103 #128

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;

create table t1_copy_resume(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
Expand Down Expand Up @@ -133,6 +139,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
142 changes: 142 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sort"
"sync"
"testing"

Expand Down Expand Up @@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) {
}
}

func TestVStreamCopyResume(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}

// Any subsequent GTIDs will be part of the stream
mpos, err := mconn.PrimaryPosition()
require.NoError(t, err)

// lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9)
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1_copy_resume",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}

catchupQueries := []string{
"insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy
"update t1_copy_resume set id2 = 10 where id1 = 1",
"insert into t1(id1, id2) values(100,100)",
"delete from t1_copy_resume where id1 = 1",
"update t1_copy_resume set id2 = 90 where id1 = 9",
}
for _, query := range catchupQueries {
_, err = conn.ExecuteFetch(query, 1, false)
require.NoError(t, err)
}

var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1_copy_resume",
Filter: "select * from t1_copy_resume",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)

expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9)
expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach
rowCopyEvents, replCatchupEvents := 0, 0
expectedEvents := []string{
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
}
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_ROW {
evs = append(evs, ev)
if ev.Timestamp == 0 {
rowCopyEvents++
} else {
replCatchupEvents++
}
printEvents(evs) // for debugging ci failures
}
}
if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents {
sort.Sort(VEventSorter(evs))
for i, ev := range evs {
require.Regexp(t, expectedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyResume was successful")
return
}
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
log.Errorf("Returned err %v", err)
t.Fatalf("remote error: %v\n", err)
}
}
}

func TestVStreamCurrent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) {
s += "===END===" + "\n"
log.Infof("%s", s)
}

// Sort the VEvents by the first row change's after value bytes primarily, with
// secondary ordering by timestamp (ASC). Note that row copy events do not have
// a timestamp and the value will be 0.
type VEventSorter []*binlogdatapb.VEvent

func (v VEventSorter) Len() int {
return len(v)
}
func (v VEventSorter) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
func (v VEventSorter) Less(i, j int) bool {
valsI := v[i].GetRowEvent().RowChanges[0].After
if valsI == nil {
valsI = v[i].GetRowEvent().RowChanges[0].Before
}
valsJ := v[j].GetRowEvent().RowChanges[0].After
if valsJ == nil {
valsJ = v[j].GetRowEvent().RowChanges[0].Before
}
valI := string(valsI.Values)
valJ := string(valsJ.Values)
if valI == valJ {
return v[i].Timestamp < v[j].Timestamp
}
return valI < valJ
}
77 changes: 58 additions & 19 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ func getQuery(tableName string, filter string) string {
query = buf.String()
case key.IsKeyRange(filter):
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter))
// note: sqlparser.NewTableIdent is renamed to NewIdentifierCS in v15
buf.Myprintf("select * from %v where in_keyrange(%v)",
sqlparser.NewTableIdent(tableName), sqlparser.NewStrLiteral(filter))
query = buf.String()
}
return query
Expand All @@ -229,7 +231,40 @@ func (uvs *uvstreamer) Cancel() {
uvs.cancel()
}

// during copy phase only send streaming events (during catchup/fastforward) for pks already seen
// We have not yet implemented the logic to check if an event is for a row that is already copied,
// so we always return true so that we send all events for this table and so we don't miss events.
func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool {
return true
}

// Only send catchup/fastforward events for tables whose copy phase is complete or in progress.
// This ensures we fulfill the at-least-once delivery semantics for events.
// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort
// for comparable PKs.
func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool {
table, ok := uvs.plans[tableName]
// Event is for a table which is not in its copy phase.
if !ok {
return true
}

// if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it
if table.tablePK == nil || table.tablePK.Lastpk == nil {
return false
}

// Table is currently in its copy phase. We have not yet implemented the logic to
// check if an event is for a row that is already copied, so we always return true
// there so that we don't miss events.
// We may send duplicate insert events or update/delete events for rows not yet seen
// to the client for the table being copied. This is ok as the client is expected to be
// idempotent: we only promise at-least-once semantics for VStream API (not exactly-once).
// Aside: vreplication workflows handle at-least-once by adding where clauses that render
// DML queries, related to events for rows not yet copied, as no-ops.
return uvs.isRowCopied(tableName, ev)
}

// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started.
func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent {
if len(uvs.plans) == 0 {
return evs
Expand All @@ -239,25 +274,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.
var shouldSend bool

for _, ev := range evs {
shouldSend = false
tableName = ""
switch ev.Type {
case binlogdatapb.VEventType_ROW:
tableName = ev.RowEvent.TableName
case binlogdatapb.VEventType_FIELD:
tableName = ev.FieldEvent.TableName
default:
tableName = ""
}
switch ev.Type {
case binlogdatapb.VEventType_HEARTBEAT:
shouldSend = false
default:
shouldSend = true
}
if !shouldSend && tableName != "" {
shouldSend = true
_, ok := uvs.plans[tableName]
if ok {
shouldSend = false
}
shouldSend = uvs.shouldSendEventForTable(tableName, ev)
}

if shouldSend {
evs2 = append(evs2, ev)
}
Expand Down Expand Up @@ -331,7 +362,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
}
if !curPos.AtLeast(pos) {
uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"GTIDSet Mismatch: requested source position:%v, current target vrep position: %v",
mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
return nil
Expand All @@ -346,17 +379,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) {
return conn.PrimaryPosition()
}

// Possible states:
// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos
// 2. TablePKs nil, startPos empty => full table copy of tables matching filter
// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK)
// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK)
func (uvs *uvstreamer) init() error {
if uvs.startPos != "" {
if err := uvs.setStreamStartPosition(); err != nil {
if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ {
if err := uvs.buildTablePlan(); err != nil {
return err
}
} else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 {
if err := uvs.buildTablePlan(); err != nil {
}
if uvs.startPos != "" {
if err := uvs.setStreamStartPosition(); err != nil {
return err
}
}

if uvs.pos.IsZero() && (len(uvs.plans) == 0) {
return fmt.Errorf("stream needs a position or a table to copy")
}
Expand All @@ -378,7 +416,8 @@ func (uvs *uvstreamer) Stream() error {
}
uvs.sendTestEvent("Copy Done")
}
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)

uvs.setVs(vs)
return vs.Stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
uvstreamerTestMode = true
defer func() { uvstreamerTestMode = false }()
initialize(t)

if err := engine.se.Reload(context.Background()); err != nil {
t.Fatal("Error reloading schema")
}
Expand All @@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
var tablePKs []*binlogdatapb.TableLastPK
for i, table := range testState.tables {
rules = append(rules, getRule(table))

// for table t2, let tablepk be nil, so that we don't send events for the insert in initTables()
if table == "t2" {
continue
}

tablePKs = append(tablePKs, getTablePK(table, i+1))
}
filter := &binlogdatapb.Filter{
Expand Down Expand Up @@ -246,7 +253,7 @@ commit;"

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */
Expand Down