Skip to content

Commit

Permalink
slack-15.0: VStreams fix backports/patches, pt. 1 (#373)
Browse files Browse the repository at this point in the history
* add vtgate flag that explicitly allows vstream copy (#125)

* fix fs.BoolVar

Signed-off-by: Tim Vaillancourt <[email protected]>

* VSCopy: Resume the copy phase consistently from given GTID and lastpk (vitessio#11103)

* VSCopy: Demonstrate to fail a test case on which the vstream API is supposed to resume the copy phase consistently

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Resume the copy phase consistently from given GTID and lastpk

Signed-off-by: yoheimuta <[email protected]>

* Build out the unit test some more

Signed-off-by: Matt Lord <[email protected]>

* Update tests for new behavior

Signed-off-by: Matt Lord <[email protected]>

* Improve comments

Signed-off-by: Matt Lord <[email protected]>

* Limit uvstreamer changes and update test

Signed-off-by: Matt Lord <[email protected]>

* Revert uvstreamer test changes

Signed-off-by: Matt Lord <[email protected]>

* Revert all uvstream changes

Signed-off-by: Matt Lord <[email protected]>

* VCopy: Revert the last three commits

Signed-off-by: yoheimuta <[email protected]>

* VCopy: Add a new vstream type that allows picking up where we left off

Signed-off-by: yoheimuta <[email protected]>

* VCopy: Revert the unit test change

Signed-off-by: yoheimuta <[email protected]>

* VCopy: Fix the end-to-end CI test

Signed-off-by: yoheimuta <[email protected]>

* Update logic for setting up uvstreamer based on input vgtid/tablepks. Add more catchup events to test

Signed-off-by: Rohit Nayak <[email protected]>

* Refactor logic to decide if event is to be sent. Enhance unit and e2e tests.

Signed-off-by: Rohit Nayak <[email protected]>

* Don't send events for tables which we can identify as ones we haven't started copy for

Signed-off-by: Rohit Nayak <[email protected]>

* Minor changes after self-review

Signed-off-by: Rohit Nayak <[email protected]>

* Add vstream copy resume to release notes

Signed-off-by: Matt Lord <[email protected]>

* Address review comments

Signed-off-by: Matt Lord <[email protected]>

Signed-off-by: yoheimuta <[email protected]>
Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>

* VSCopy: Send COPY_COMPLETED events when the copy operation is done (vitessio#11740)

* VSCopy: Demonstrate to fail a test case on which the vstream API sends new events showing copy completed

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Send new events when the copy operation is done

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Fix typo

Signed-off-by: yoheimuta <[email protected]>

* Initialize new map for the 'vstream * from' vtgate sql interface. Make vtadmin web protos

Signed-off-by: Rohit Nayak <[email protected]>

* VSCopy: Make TestVStreamCopyBasic fail fast to avoid the end2end timeout out

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: stop sharing the 't1' table among multiple test cases running concurrently

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: refactor the function signature to be clearer

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: refactor the VEvents sorter to be simpler

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: refactor to stop the sorter from including a fully copied event

Signed-off-by: yoheimuta <[email protected]>

Signed-off-by: yoheimuta <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>

* VSCopy: Enable to copy from all shards in either a specified keyspace or all keyspaces (vitessio#11909)

* VSCopy: Demonstrate to fail a test case on which the vstream API request doesn't include keyspace and shard

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Copy from all shards in all keyspaces by specifying only an empty gtid

Signed-off-by: yoheimuta <[email protected]>

* tests: Make TestRowCount stable regardless of the number of keyspaces

Signed-off-by: yoheimuta <[email protected]>

* tests: Cleanup TestCreateAndDropDatabase correctly to stop TestVStreamCopyWithoutKeyspaceShard from failing when running tests together

Signed-off-by: yoheimuta <[email protected]>

* tests: Tweak to fix a comment

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: fix the unit tests when the input vgtid with an empty gtid lacks either keyspace or shard

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Keyspace wildcard selection lines up with the table wildcard selection

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Tests the VCopy with multiple keyspaces and resharding

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Make TestVStreamCopyMultiKeyspaceReshard clearer to check if the streaming two keyspaces works even after reshard

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Return an invalid argument error if shards are unspecified and gtid is neither 'current' nor empty

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Add a test description about its purpose and target

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Remove duplicate literals in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Retain defaultReplicas variable in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Explain why we are setting Match to 'customer.*' in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Remove an unused VStreamFlag for the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Use sentence capitalization in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Verify that we didn't lose any events or get duplicates of the keyspace being reshareded in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Return a value instead of a pointer because there is no need to modify the value

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Add a comment describing what TestVStreamCopyFromAllKeyspacesAndAllShards is doing and why

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Add a comment describing why we expect these specific numbers of events from VStream API

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Tweak the test case name

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Make a utility function to sort COPY_COMPLETED events in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Replace the matcher with a simpler one in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Move the print debug call to the FailNow section in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Use require.NoError in new tests

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Use require instead of t.Fatalf in the test

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Apply the reviewer's suggestion to make the error message easier to read

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Add a comment noting what we're actually testing

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Correct the test comment and elaborate the special-case

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Tweak an error message and a comment

Signed-off-by: yoheimuta <[email protected]>

* VSCopy: Adjust to a change in the signature of a test function that was introduced in the main repository

Signed-off-by: yoheimuta <[email protected]>

---------

Signed-off-by: yoheimuta <[email protected]>

* attempt unit test fix

Signed-off-by: Tim Vaillancourt <[email protected]>

* update test error expected

Signed-off-by: Tim Vaillancourt <[email protected]>

---------

Signed-off-by: Tim Vaillancourt <[email protected]>
Signed-off-by: yoheimuta <[email protected]>
Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: pbibra <[email protected]>
Co-authored-by: yohei yoshimuta <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
5 people authored May 28, 2024
1 parent 36378c1 commit 8125303
Show file tree
Hide file tree
Showing 22 changed files with 994 additions and 154 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Usage of vtgate:
--mysql_slow_connect_warn_threshold duration Warn if it takes more than the given threshold for a mysql connection to establish (default 0s)
--mysql_tcp_version string Select tcp, tcp4, or tcp6 to control the socket type. (default "tcp")
--no_scatter when set to true, the planner will fail instead of producing a plan that includes scatter queries
--no_vstream_copy when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this
--normalize_queries Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true)
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
Expand Down
169 changes: 165 additions & 4 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

const schemaUnsharded = `
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
insert into customer_seq(id, next_id, cache) values(0, 1, 3);
`
const vschemaUnsharded = `
{
Expand Down Expand Up @@ -218,14 +219,18 @@ const vschemaSharded = `
func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
log.Infof("error inserting row %d: %v", id, err)
}
vtgateConn.ExecuteFetch("commit", 1000, false)
}

type numEvents struct {
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64
}

// tests the StopOnReshard flag
Expand Down Expand Up @@ -375,6 +380,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
return &ne
}

// Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces
// Ensure that there are no missing row events during the resharding process.
func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents {
defaultCellName := "zone1"
allCellNames = defaultCellName
allCells := []string{allCellNames}
vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig)

require.NotNil(t, vc)
ogdr := defaultReplicas
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1, time.Second*30)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// We want to confirm that the following two tables are streamed.
// 1. the customer_seq in the unsharded keyspace
// 2. the customer table in the sharded keyspace
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
done := false

id := 1000
// First goroutine that keeps inserting rows into the table being streamed until a minute after reshard
// We should keep getting events on the new shards
go func() {
for {
if done {
return
}
id++
time.Sleep(1 * time.Second)
insertRow("sharded", "customer", id)
}
}()
// stream events from the VStream API
var ne numEvents
reshardDone := false
go func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "0":
if reshardDone {
ne.numShard0AfterReshardEvents++
} else {
ne.numShard0BeforeReshardEvents++
}
case "-80":
ne.numLessThan80Events++
case "80-":
ne.numGreaterThan80Events++
case "-40":
ne.numLessThan40Events++
case "40-":
ne.numGreaterThan40Events++
}
ne.numRowEvents++
case binlogdatapb.VEventType_JOURNAL:
ne.numJournalEvents++
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Errorf("Returned err %v", err)
done = true
}
if done {
return
}
}
}()

ticker := time.NewTicker(1 * time.Second)
tickCount := 0
for {
<-ticker.C
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
reshardDone = true
case 60:
done = true
}
if done {
break
}
}
log.Infof("ne=%v", ne)

// The number of row events streamed by the VStream API should match the number of rows inserted.
// This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process.
//
// On the other hand, we don't verify the exact number of row events for the unsharded keyspace
// because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1.
// We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding,
// is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward.
customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer")
insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0])
require.NoError(t, err)
require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events)
return ne
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down Expand Up @@ -406,3 +555,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) {

testVStreamWithFailover(t, false)
}

func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) {
ne := testVStreamCopyMultiKeyspaceReshard(t, 3000)
require.Equal(t, int64(0), ne.numJournalEvents)
require.NotZero(t, ne.numRowEvents)
require.NotZero(t, ne.numShard0BeforeReshardEvents)
require.NotZero(t, ne.numShard0AfterReshardEvents)
require.NotZero(t, ne.numLessThan80Events)
require.NotZero(t, ne.numGreaterThan80Events)
require.NotZero(t, ne.numLessThan40Events)
require.NotZero(t, ne.numGreaterThan40Events)
}
63 changes: 35 additions & 28 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false)

queryLogBufferSize := 10
vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize)
Expand Down
Loading

0 comments on commit 8125303

Please sign in to comment.