Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VSCopy: Resume the copy phase consistently from given GTID and lastpk #11103

Merged
merged 20 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
db62f04
VSCopy: Demonstrate to fail a test case on which the vstream API is s…
yoheimuta Aug 26, 2022
0556705
VSCopy: Resume the copy phase consistently from given GTID and lastpk
yoheimuta Aug 26, 2022
bdcf51b
Build out the unit test some more
mattlord Aug 29, 2022
208e4ff
Merge remote-tracking branch 'origin/main' into vscopy-restart
mattlord Aug 29, 2022
fc4743d
Update tests for new behavior
mattlord Aug 30, 2022
2c5042a
Improve comments
mattlord Aug 30, 2022
9971669
Limit uvstreamer changes and update test
mattlord Aug 30, 2022
2a8dd96
Revert uvstreamer test changes
mattlord Aug 30, 2022
ef896bb
Revert all uvstream changes
mattlord Aug 30, 2022
6d99dba
VCopy: Revert the last three commits
yoheimuta Aug 31, 2022
60550f1
VCopy: Add a new vstream type that allows picking up where we left off
yoheimuta Aug 31, 2022
a1410c4
VCopy: Revert the unit test change
yoheimuta Aug 31, 2022
7cbbd1c
VCopy: Fix the end-to-end CI test
yoheimuta Sep 1, 2022
ab242da
Update logic for setting up uvstreamer based on input vgtid/tablepks.…
rohit-nayak-ps Nov 1, 2022
6d7d516
Refactor logic to decide if event is to be sent. Enhance unit and e2e…
rohit-nayak-ps Nov 1, 2022
3e24e87
Don't send events for tables which we can identify as ones we haven't…
rohit-nayak-ps Nov 2, 2022
4317172
Minor changes after self-review
rohit-nayak-ps Nov 3, 2022
6f3d9df
Merge remote-tracking branch 'origin/main' into vscopy-restart
mattlord Nov 7, 2022
1edd07c
Add vstream copy resume to release notes
mattlord Nov 7, 2022
03dcf6d
Address review comments
mattlord Nov 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/releasenotes/16_0_0_summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

- [New command line flags and behavior](#new-command-line-flags-and-behavior)

- **[VReplication](#vreplication)**
- [VStream Copy Resume](#vstream-copy-resume)

## Known Issues

## Major Changes

### <a id="vreplication"/>VReplication

#### <a id="vstream-copy-resume"/>VStream Copy Resume

In [PR #11103](https://github.com/vitessio/vitess/pull/11103) we introduced the ability to resume a `VTGate` [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/). This is useful when a [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/) is interrupted due to e.g. a network failure or a server restart. The `VStream` copy operation can be resumed by specifying each table's last seen primary key value in the `VStream` request. Please see the [`VStream` docs](https://vitess.io/docs/16.0/reference/vreplication/vstream/) for more details.

### Breaking Changes

#### Orchestrator Integration Deletion
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;

create table t1_copy_resume(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;

create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
Expand Down Expand Up @@ -134,6 +140,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
"t1_copy_resume": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Expand Down
142 changes: 142 additions & 0 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"sort"
"sync"
"testing"

Expand Down Expand Up @@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) {
}
}

func TestVStreamCopyResume(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
mattlord marked this conversation as resolved.
Show resolved Hide resolved
t.Fatal(err)
}

// Any subsequent GTIDs will be part of the stream
mpos, err := mconn.PrimaryPosition()
require.NoError(t, err)

// lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9)
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
TableName: "t1_copy_resume",
Lastpk: sqltypes.ResultToProto3(&lastPK),
}}

catchupQueries := []string{
"insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy
"update t1_copy_resume set id2 = 10 where id1 = 1",
"insert into t1(id1, id2) values(100,100)",
"delete from t1_copy_resume where id1 = 1",
"update t1_copy_resume set id2 = 90 where id1 = 9",
}
for _, query := range catchupQueries {
_, err = conn.ExecuteFetch(query, 1, false)
require.NoError(t, err)
}

var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1_copy_resume",
Filter: "select * from t1_copy_resume",
}},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)

expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9)
expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach
rowCopyEvents, replCatchupEvents := 0, 0
expectedEvents := []string{
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
`type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
`type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
}
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
for _, ev := range e {
if ev.Type == binlogdatapb.VEventType_ROW {
evs = append(evs, ev)
if ev.Timestamp == 0 {
rowCopyEvents++
} else {
replCatchupEvents++
}
printEvents(evs) // for debugging ci failures
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}
}
if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents {
sort.Sort(VEventSorter(evs))
for i, ev := range evs {
require.Regexp(t, expectedEvents[i], ev.String())
}
t.Logf("TestVStreamCopyResume was successful")
return
}
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
log.Errorf("Returned err %v", err)
t.Fatalf("remote error: %v\n", err)
}
}
}

func TestVStreamCurrent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) {
s += "===END===" + "\n"
log.Infof("%s", s)
}

// Sort the VEvents by the first row change's after value bytes primarily, with
// secondary ordering by timestamp (ASC). Note that row copy events do not have
// a timestamp and the value will be 0.
type VEventSorter []*binlogdatapb.VEvent

func (v VEventSorter) Len() int {
return len(v)
}
func (v VEventSorter) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
func (v VEventSorter) Less(i, j int) bool {
valsI := v[i].GetRowEvent().RowChanges[0].After
if valsI == nil {
valsI = v[i].GetRowEvent().RowChanges[0].Before
}
valsJ := v[j].GetRowEvent().RowChanges[0].After
if valsJ == nil {
valsJ = v[j].GetRowEvent().RowChanges[0].Before
}
valI := string(valsI.Values)
valJ := string(valsJ.Values)
if valI == valJ {
return v[i].Timestamp < v[j].Timestamp
}
return valI < valJ
}
76 changes: 57 additions & 19 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func getQuery(tableName string, filter string) string {
query = buf.String()
case key.IsKeyRange(filter):
buf := sqlparser.NewTrackedBuffer(nil)
buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter))
buf.Myprintf("select * from %v where in_keyrange(%v)",
sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter))
query = buf.String()
}
return query
Expand All @@ -229,7 +230,40 @@ func (uvs *uvstreamer) Cancel() {
uvs.cancel()
}

// during copy phase only send streaming events (during catchup/fastforward) for pks already seen
// We have not yet implemented the logic to check if an event is for a row that is already copied,
// so we always return true so that we send all events for this table and so we don't miss events.
mattlord marked this conversation as resolved.
Show resolved Hide resolved
func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool {
return true
}

// Only send catchup/fastforward events for tables whose copy phase is complete or in progress.
// This ensures we fulfill the at-least-once delivery semantics for events.
// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort
// for comparable PKs.
func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool {
table, ok := uvs.plans[tableName]
// Event is for a table which is not in its copy phase.
if !ok {
return true
}

// if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it
if table.tablePK == nil || table.tablePK.Lastpk == nil {
return false
}

// Table is currently in its copy phase. We have not yet implemented the logic to
// check if an event is for a row that is already copied, so we always return true
// there so that we don't miss events.
// We may send duplicate insert events or update/delete events for rows not yet seen
// to the client for the table being copied. This is ok as the client is expected to be
// idempotent: we only promise at-least-once semantics for VStream API (not exactly-once).
// Aside: vreplication workflows handle at-least-once by adding where clauses that render
// DML queries, related to events for rows not yet copied, as no-ops.
return uvs.isRowCopied(tableName, ev)
}

// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started.
func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent {
if len(uvs.plans) == 0 {
return evs
Expand All @@ -239,25 +273,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.
var shouldSend bool

for _, ev := range evs {
shouldSend = false
tableName = ""
switch ev.Type {
case binlogdatapb.VEventType_ROW:
tableName = ev.RowEvent.TableName
case binlogdatapb.VEventType_FIELD:
tableName = ev.FieldEvent.TableName
default:
tableName = ""
}
switch ev.Type {
case binlogdatapb.VEventType_HEARTBEAT:
shouldSend = false
default:
shouldSend = true
}
if !shouldSend && tableName != "" {
shouldSend = true
_, ok := uvs.plans[tableName]
if ok {
shouldSend = false
}
shouldSend = uvs.shouldSendEventForTable(tableName, ev)
}

if shouldSend {
evs2 = append(evs2, ev)
}
Expand Down Expand Up @@ -331,7 +361,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
}
if !curPos.AtLeast(pos) {
uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1)
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"GTIDSet Mismatch: requested source position:%v, current target vrep position: %v",
mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
return nil
Expand All @@ -346,17 +378,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) {
return conn.PrimaryPosition()
}

// Possible states:
// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos
// 2. TablePKs nil, startPos empty => full table copy of tables matching filter
// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK)
// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK)
func (uvs *uvstreamer) init() error {
if uvs.startPos != "" {
if err := uvs.setStreamStartPosition(); err != nil {
if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ {
if err := uvs.buildTablePlan(); err != nil {
return err
}
} else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 {
if err := uvs.buildTablePlan(); err != nil {
}
if uvs.startPos != "" {
if err := uvs.setStreamStartPosition(); err != nil {
return err
}
}

if uvs.pos.IsZero() && (len(uvs.plans) == 0) {
return fmt.Errorf("stream needs a position or a table to copy")
}
Expand All @@ -378,7 +415,8 @@ func (uvs *uvstreamer) Stream() error {
}
uvs.sendTestEvent("Copy Done")
}
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)

uvs.setVs(vs)
return vs.Stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
uvstreamerTestMode = true
defer func() { uvstreamerTestMode = false }()
initialize(t)

if err := engine.se.Reload(context.Background()); err != nil {
t.Fatal("Error reloading schema")
}
Expand All @@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
var tablePKs []*binlogdatapb.TableLastPK
for i, table := range testState.tables {
rules = append(rules, getRule(table))

// for table t2, let tablepk be nil, so that we don't send events for the insert in initTables()
if table == "t2" {
continue
}

tablePKs = append(tablePKs, getTablePK(table, i+1))
}
filter := &binlogdatapb.Filter{
Expand Down Expand Up @@ -246,7 +253,7 @@ commit;"

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */
Expand Down