Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSCopy: Resume the copy phase consistently from given GTID and lastpk #11103

Merged
merged 20 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
db62f04
VSCopy: Demonstrate to fail a test case on which the vstream API is s…
yoheimuta Aug 26, 2022
0556705
VSCopy: Resume the copy phase consistently from given GTID and lastpk
yoheimuta Aug 26, 2022
bdcf51b
Build out the unit test some more
mattlord Aug 29, 2022
208e4ff
Merge remote-tracking branch 'origin/main' into vscopy-restart
mattlord Aug 29, 2022
fc4743d
Update tests for new behavior
mattlord Aug 30, 2022
2c5042a
Improve comments
mattlord Aug 30, 2022
9971669
Limit uvstreamer changes and update test
mattlord Aug 30, 2022
2a8dd96
Revert uvstreamer test changes
mattlord Aug 30, 2022
ef896bb
Revert all uvstream changes
mattlord Aug 30, 2022
6d99dba
VCopy: Revert the last three commits
yoheimuta Aug 31, 2022
60550f1
VCopy: Add a new vstream type that allows picking up where we left off
yoheimuta Aug 31, 2022
a1410c4
VCopy: Revert the unit test change
yoheimuta Aug 31, 2022
7cbbd1c
VCopy: Fix the end-to-end CI test
yoheimuta Sep 1, 2022
ab242da
Update logic for setting up uvstreamer based on input vgtid/tablepks.…
rohit-nayak-ps Nov 1, 2022
6d7d516
Refactor logic to decide if event is to be sent. Enhance unit and e2e…
rohit-nayak-ps Nov 1, 2022
3e24e87
Don't send events for tables which we can identify as ones we haven't…
rohit-nayak-ps Nov 2, 2022
4317172
Minor changes after self-review
rohit-nayak-ps Nov 3, 2022
6f3d9df
Merge remote-tracking branch 'origin/main' into vscopy-restart
mattlord Nov 7, 2022
1edd07c
Add vstream copy resume to release notes
mattlord Nov 7, 2022
03dcf6d
Address review comments
mattlord Nov 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sort"
"sync"
"testing"

Expand Down Expand Up @@ -232,6 +233,120 @@ func TestVStreamCopyBasic(t *testing.T) {
}
}

func TestVStreamCopyResume(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
t.Fatal(err)
}

// Any subsequent GTIDs will be part of the stream
mpos, err := mconn.PrimaryPosition()
if err != nil {
t.Fatal(err)
}

// This GTID should end up as a no-op because we should have copied the
// existing row
_, err = conn.ExecuteFetch("insert into t1(id1,id2) values(9,9)", 1, false)
if err != nil {
t.Fatal(err)
}

// lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9)
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}

// This GTID must have a before and after value
_, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false)
if err != nil {
t.Fatal(err)
}

var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)

expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9)
expectedCatchupEvents := 2 // id1=9 and id2=9; id2=10 where id1=1
rowCopyEvents, replCatchupEvents := 0, 0
expectedEvents := []string{
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
}
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_ROW {
evs = append(evs, ev)
if ev.Timestamp == 0 {
rowCopyEvents++
} else {
replCatchupEvents++
}
printEvents(evs) // for debugging ci failures
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}
}
if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents {
sort.Sort(VEventSorter(evs))
for i, ev := range evs {
require.Regexp(t, expectedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyResume was successful")
return
}
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
log.Errorf("Returned err %v", err)
t.Fatalf("remote error: %v\n", err)
}
}
}

func TestVStreamCurrent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -396,3 +511,23 @@ func printEvents(evs []*binlogdatapb.VEvent) {
s += "===END===" + "\n"
log.Infof("%s", s)
}

// Sort the VEvents by the first row change's after value bytes primarily, with
// secondary ordering by timestamp (ASC). Note that row copy events do not have
// a timestamp and the value will be 0.
type VEventSorter []*binlogdatapb.VEvent

func (v VEventSorter) Len() int {
return len(v)
}
func (v VEventSorter) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
func (v VEventSorter) Less(i, j int) bool {
valI := string(v[i].GetRowEvent().RowChanges[0].After.Values)
valJ := string(v[j].GetRowEvent().RowChanges[0].After.Values)
if valI == valJ {
return v[i].Timestamp < v[j].Timestamp
}
return valI < valJ
}
20 changes: 16 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,19 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.
}
if !shouldSend && tableName != "" {
shouldSend = true
_, ok := uvs.plans[tableName]
if ok {
shouldSend = false
// If the event is on a table we haven't yet fully copied...
if plan, ok := uvs.plans[tableName]; ok {
// If there's a lastPK value then we're in the middle of a table's copy phase
if plan.tablePK != nil && plan.tablePK.Lastpk != nil {
// Ideally we should compare the PKs and only send events for rows which have been copied.
// For now, we send all changes and allow for any duplicate events -- meaning that e.g.
// we apply events in the stream for a row insert, table:t2 pk:9, even though we will
// later copy the t2 table contents and copy that same row event again -- which should
// become harmless no-ops if the row did not change in the interim.
shouldSend = true
} else {
shouldSend = false
}
}
}
if shouldSend {
Expand Down Expand Up @@ -351,7 +361,9 @@ func (uvs *uvstreamer) init() error {
if err := uvs.setStreamStartPosition(); err != nil {
return err
}
} else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 {
}

if uvs.startPos == "" || len(uvs.inTablePKs) > 0 {
if err := uvs.buildTablePlan(); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ commit;"
}

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* Repeated no-op events -- 1 field, 1 row -- from t2 row insert in stream before t2 table copy starts */
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
Expand Down Expand Up @@ -478,6 +479,8 @@ var expectedEvents = []string{
"type:BEGIN",
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}",
"type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}",
"type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}",
"type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}",
"type:GTID",
"type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied
"type:OTHER gtid:\"Copy Start t2\"",
Expand Down