-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
VSCopy: Enable to copy from all shards in either a specified keyspace or all keyspaces #11909
Changes from 6 commits
077136b
2f7adbc
3afc402
05e5ea0
013b2f0
fdb2b68
d83daaf
71e1681
436568d
fdc13e1
c39af7c
7e67f27
ca2dafd
24e69b9
dc98a42
c2138b5
6910cbc
16f10b3
6a791ba
c70fa39
cdc4077
6b2d9fe
a013980
cd119d4
f7e480f
e8f0e60
ba8c231
d730563
3b41d76
d7169ec
8970182
38eb575
256400f
94beeac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,13 +24,15 @@ import ( | |
"github.com/stretchr/testify/require" | ||
|
||
"vitess.io/vitess/go/mysql" | ||
"vitess.io/vitess/go/test/endtoend/utils" | ||
) | ||
|
||
func TestRowCount(t *testing.T) { | ||
ctx := context.Background() | ||
conn, err := mysql.Connect(ctx, &vtParams) | ||
require.NoError(t, err) | ||
defer conn.Close() | ||
utils.Exec(t, conn, "use ks") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This use statement is now necessary because the number of keyspace is multiple. 3afc402 |
||
type tc struct { | ||
query string | ||
expected int | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,6 +258,130 @@ func TestVStreamCopyBasic(t *testing.T) { | |
} | ||
} | ||
|
||
func TestVStreamCopyWithoutKeyspaceShard(t *testing.T) { | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
conn, err := mysql.Connect(ctx, &vtParams) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer conn.Close() | ||
|
||
_, err = conn.ExecuteFetch("insert into t1_copy_all(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
_, err = conn.ExecuteFetch("insert into t1_copy_all_ks2(id1,id2) values(10,10), (20,20)", 1, false) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
filter := &binlogdatapb.Filter{ | ||
Rules: []*binlogdatapb.Rule{{ | ||
Match: "/t1_copy_all.*/", | ||
}}, | ||
} | ||
flags := &vtgatepb.VStreamFlags{} | ||
|
||
expectedKs1EventNum := 2 /* num shards */ * (9 /* begin/field/vgtid:pos/4 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) | ||
expectedKs2EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) | ||
expectedFullyCopyCompletedNum := 1 | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
cases := []struct { | ||
name string | ||
shardGtid *binlogdatapb.ShardGtid | ||
expectedEventNum int | ||
expectedCompletedEvents []string | ||
}{ | ||
{ | ||
name: "copy from all keyspaces", | ||
shardGtid: &binlogdatapb.ShardGtid{ | ||
Gtid: "", | ||
}, | ||
expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedFullyCopyCompletedNum, | ||
expectedCompletedEvents: []string{ | ||
`type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, | ||
`type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, | ||
`type:COPY_COMPLETED keyspace:"ks2" shard:"-80"`, | ||
`type:COPY_COMPLETED keyspace:"ks2" shard:"80-"`, | ||
`type:COPY_COMPLETED`, | ||
}, | ||
}, | ||
{ | ||
name: "copy from all shards in the keyspace", | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
shardGtid: &binlogdatapb.ShardGtid{ | ||
Keyspace: "ks", | ||
Gtid: "", | ||
}, | ||
expectedEventNum: expectedKs1EventNum + expectedFullyCopyCompletedNum, | ||
expectedCompletedEvents: []string{ | ||
`type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, | ||
`type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, | ||
`type:COPY_COMPLETED`, | ||
}, | ||
}, | ||
} | ||
for _, c := range cases { | ||
t.Run(c.name, func(t *testing.T) { | ||
gconn, conn, mconn, closeConnections := initialize(ctx, t) | ||
defer closeConnections() | ||
|
||
var vgtid = &binlogdatapb.VGtid{} | ||
vgtid.ShardGtids = []*binlogdatapb.ShardGtid{c.shardGtid} | ||
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) | ||
_, _ = conn, mconn | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
require.NotNil(t, reader) | ||
var evs []*binlogdatapb.VEvent | ||
var completedEvs []*binlogdatapb.VEvent | ||
for { | ||
e, err := reader.Recv() | ||
switch err { | ||
case nil: | ||
evs = append(evs, e...) | ||
|
||
for _, ev := range e { | ||
if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { | ||
completedEvs = append(completedEvs, ev) | ||
} | ||
} | ||
|
||
printEvents(evs) // for debugging ci failures | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if len(evs) == c.expectedEventNum { | ||
// The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. | ||
// On the other hand, the last event should always be a fully COPY_COMPLETED event. | ||
// That's why the sort.Slice doesn't have to handle the last element in completedEvs. | ||
sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool { | ||
if completedEvs[i].GetKeyspace() != completedEvs[j].GetKeyspace() { | ||
return completedEvs[i].GetKeyspace() < completedEvs[j].GetKeyspace() | ||
} | ||
return completedEvs[i].GetShard() < completedEvs[j].GetShard() | ||
}) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i, ev := range completedEvs { | ||
require.Regexp(t, c.expectedCompletedEvents[i], ev.String()) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
t.Logf("TestVStreamCopyWithoutKeyspaceShard was successful") | ||
return | ||
} else if c.expectedEventNum < len(evs) { | ||
t.Fatalf("len(events)=%v are not expected\n", len(evs)) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
case io.EOF: | ||
log.Infof("stream ended\n") | ||
cancel() | ||
Comment on lines
+378
to
+379
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean for the test? Just that we're done? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially thought that this code was unnecessary and didn't have any meaningful purpose, but I realized that it might have been added for a specific reason based on the assumption that the existing test code was reasonable and robust. To clarify the intent behind this code, I will ask the original implementer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi 👋 @rohit-nayak-ps There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was curious. This isn't blocking. |
||
default: | ||
log.Errorf("Returned err %v", err) | ||
t.Fatalf("remote error: %v\n", err) | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestVStreamCopyResume(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
Original file line number | Diff line number | Diff line change | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -179,13 +179,8 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat | ||||||||||||||||||||||
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") | |||||||||||||||||||||||
} | |||||||||||||||||||||||
// To fetch from all keyspaces, the input must contain a single ShardGtid | |||||||||||||||||||||||
// that has an empty keyspace, and the Gtid must be "current". In the | |||||||||||||||||||||||
// future, we'll allow the Gtid to be empty which will also support | |||||||||||||||||||||||
// copying of existing data. | |||||||||||||||||||||||
// that has an empty keyspace. | |||||||||||||||||||||||
if len(vgtid.ShardGtids) == 1 && vgtid.ShardGtids[0].Keyspace == "" { | |||||||||||||||||||||||
if vgtid.ShardGtids[0].Gtid != "current" { | |||||||||||||||||||||||
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) | |||||||||||||||||||||||
} | |||||||||||||||||||||||
keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) | |||||||||||||||||||||||
if err != nil { | |||||||||||||||||||||||
return nil, nil, nil, err | |||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer the request be more explicit as it will likely generate A LOT of load and could have a big impact on production. I think it would also be nice if the keyspace wildcard selection lined up with the table wildcard selection in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattlord It makes sense in terms of the difference in impact between "streaming" and "copy." Do you think the following specifications are still required?
As such, I plan to update accepting the keyspace wildcard and preserve the previous behavior as much as possible like the below table. What do you think?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the explicit keyspace wildcard is enough and we don't need a new flag around that. Now that I understand the usage of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @rohit-nayak-ps, I'd appreciate your feedback. Let me know if anything is missing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yoheimuta, sorry for the delay. I am fine with the approach outlined above. We should add a (or modify the existing) e2e test to confirm that resharding will continue to work if multiple keyspaces are being streamed. We have code (see the logic around There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your confirmation! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @yoheimuta, we will review it soon. Yes, that CI failure is also happening on other PRs and I have asked the related team to look at it. |
|||||||||||||||||||||||
|
@@ -194,17 +189,14 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat | ||||||||||||||||||||||
for _, keyspace := range keyspaces { | |||||||||||||||||||||||
newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ | |||||||||||||||||||||||
Keyspace: keyspace, | |||||||||||||||||||||||
Gtid: "current", | |||||||||||||||||||||||
Gtid: vgtid.ShardGtids[0].Gtid, | |||||||||||||||||||||||
}) | |||||||||||||||||||||||
} | |||||||||||||||||||||||
vgtid = newvgtid | |||||||||||||||||||||||
} | |||||||||||||||||||||||
newvgtid := &binlogdatapb.VGtid{} | |||||||||||||||||||||||
for _, sgtid := range vgtid.ShardGtids { | |||||||||||||||||||||||
if sgtid.Shard == "" { | |||||||||||||||||||||||
if sgtid.Gtid != "current" { | |||||||||||||||||||||||
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid) | |||||||||||||||||||||||
} | |||||||||||||||||||||||
// TODO(sougou): this should work with the new Migrate workflow | |||||||||||||||||||||||
_, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType) | |||||||||||||||||||||||
if err != nil { | |||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -886,16 +886,37 @@ func TestResolveVStreamParams(t *testing.T) { | |
err: "vgtid must have at least one value with a starting position", | ||
}, { | ||
input: &binlogdatapb.VGtid{ | ||
ShardGtids: []*binlogdatapb.ShardGtid{{}}, | ||
ShardGtids: []*binlogdatapb.ShardGtid{{ | ||
Keyspace: "TestVStream", | ||
}}, | ||
Comment on lines
+899
to
+901
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These test additions are unrelated to this PR, or no? I would think we would test the new behavior, which would mean NOT specifying the shards and now not getting an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattlord I thought they were related to this PR.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here as well I think that comments noting what we're actually testing, how, and why are helpful. The future reader won't easily see comments here on the PR. And I know that the existing code is very poorly commented, but I want to do better moving forward. 🙂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added by 3b41d76. |
||
}, | ||
err: "for an empty keyspace, the Gtid value must be 'current'", | ||
}, { | ||
input: &binlogdatapb.VGtid{ | ||
output: &binlogdatapb.VGtid{ | ||
ShardGtids: []*binlogdatapb.ShardGtid{{ | ||
Keyspace: "TestVStream", | ||
Shard: "-20", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "20-40", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "40-60", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "60-80", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "80-a0", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "a0-c0", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "c0-e0", | ||
}, { | ||
Keyspace: "TestVStream", | ||
Shard: "e0-", | ||
}}, | ||
}, | ||
err: "if shards are unspecified, the Gtid value must be 'current'", | ||
}, { | ||
input: &binlogdatapb.VGtid{ | ||
ShardGtids: []*binlogdatapb.ShardGtid{{ | ||
|
@@ -987,17 +1008,34 @@ func TestResolveVStreamParams(t *testing.T) { | |
assert.Equal(t, wantFilter, filter, tcase.input) | ||
require.False(t, flags.MinimizeSkew) | ||
} | ||
|
||
// Special-case: empty keyspace because output is too big. | ||
input := &binlogdatapb.VGtid{ | ||
ShardGtids: []*binlogdatapb.ShardGtid{{ | ||
Gtid: "current", | ||
}}, | ||
specialCases := []struct { | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
input string | ||
output string | ||
}{ | ||
{ | ||
input: "current", | ||
output: "current", | ||
}, | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{}, | ||
} | ||
vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) | ||
require.NoError(t, err, input) | ||
if got, want := len(vgtid.ShardGtids), 8; want >= got { | ||
t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, want) | ||
for _, tcase := range specialCases { | ||
input := &binlogdatapb.VGtid{ | ||
ShardGtids: []*binlogdatapb.ShardGtid{{ | ||
Gtid: tcase.input, | ||
}}, | ||
} | ||
vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) | ||
require.NoError(t, err, tcase.input) | ||
if got, want := len(vgtid.ShardGtids), 8; want >= got { | ||
t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, want) | ||
} | ||
for _, s := range vgtid.ShardGtids { | ||
require.Equal(t, tcase.output, s.Gtid) | ||
} | ||
} | ||
|
||
for _, minimizeSkew := range []bool{true, false} { | ||
t.Run(fmt.Sprintf("resolveParams MinimizeSkew %t", minimizeSkew), func(t *testing.T) { | ||
flags := &vtgatepb.VStreamFlags{MinimizeSkew: minimizeSkew} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A leaked "testitest" keyspace makes
TestVStreamCopyWithoutKeyspaceShard/copy_from_all_keyspaces
fail.Specifically, the VStreamer API itself fails with the below error.
To fix it, 05e5ea0 adds a new cleanup function to delete the keyspace explicitly.