Skip to content

Commit

Permalink
Merge pull request #8628 from planetscale/rn-vstream-api-stop-on-reshard
Browse files Browse the repository at this point in the history
VStream API: Add flag to stop streaming on a reshard
  • Loading branch information
rohit-nayak-ps authored Aug 17, 2021
2 parents 740cff3 + b7c3e87 commit ba0b822
Show file tree
Hide file tree
Showing 12 changed files with 526 additions and 103 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/cluster_endtoend_vstream_failover.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

name: Cluster (vstream_failover)
on: [push, pull_request]
jobs:
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vstream_failover)')
cancel-in-progress: true

jobs:
build:
name: Run endtoend tests on Cluster (vstream_failover)
runs-on: ubuntu-18.04
Expand Down
53 changes: 53 additions & 0 deletions .github/workflows/cluster_endtoend_vstream_stoponreshard_false.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"

name: Cluster (vstream_stoponreshard_false)
on: [push, pull_request]
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vstream_stoponreshard_false)')
cancel-in-progress: true

jobs:
build:
name: Run endtoend tests on Cluster (vstream_stoponreshard_false)
runs-on: ubuntu-18.04

steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.16

- name: Tune the OS
run: |
echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range
# TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185
- name: Add the current IP address, long hostname and short hostname record to /etc/hosts file
run: |
echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
# DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED!

- name: Check out code
uses: actions/checkout@v2

- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard vstream_stoponreshard_false
53 changes: 53 additions & 0 deletions .github/workflows/cluster_endtoend_vstream_stoponreshard_true.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"

name: Cluster (vstream_stoponreshard_true)
on: [push, pull_request]
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vstream_stoponreshard_true)')
cancel-in-progress: true

jobs:
build:
name: Run endtoend tests on Cluster (vstream_stoponreshard_true)
runs-on: ubuntu-18.04

steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.16

- name: Tune the OS
run: |
echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range
# TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185
- name: Add the current IP address, long hostname and short hostname record to /etc/hosts file
run: |
echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
# DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED!

- name: Check out code
uses: actions/checkout@v2

- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard vstream_stoponreshard_true
1 change: 1 addition & 0 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (vtgate *VtgateProcess) TearDown() error {
if vtgate.proc == nil || vtgate.exit == nil {
return nil
}
// graceful shutdown is not currently working with vtgate, attempting a force-kill to make tests less flaky
// Attempt graceful shutdown with SIGTERM first
vtgate.proc.Process.Signal(syscall.SIGTERM)

Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func (vc *VitessCluster) teardown(t testing.TB) {
for _, vtgate := range cell.Vtgates {
if err := vtgate.TearDown(); err != nil {
log.Errorf("Error in vtgate teardown - %s", err.Error())
} else {
log.Infof("vtgate teardown successful")
}
}
}
Expand Down Expand Up @@ -436,11 +438,15 @@ func (vc *VitessCluster) teardown(t testing.TB) {
wg.Wait()
if err := vc.Vtctld.TearDown(); err != nil {
log.Infof("Error stopping Vtctld: %s", err.Error())
} else {
log.Info("Successfully stopped vtctld")
}

for _, cell := range vc.Cells {
if err := vc.Topo.TearDown(cell.Name, originalVtdataroot, vtdataroot, false, "etcd2"); err != nil {
log.Infof("Error in etcd teardown - %s", err.Error())
} else {
log.Infof("Successfully tore down topo %s", vc.Topo.Name)
}
}
}
Expand All @@ -461,6 +467,8 @@ func (vc *VitessCluster) TearDown(t testing.TB) {
case <-time.After(1 * time.Minute):
log.Infof("TearDown() timed out")
}
// some processes seem to hang around for a bit
time.Sleep(5 * time.Second)
}

func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName string, tabletType string) map[string]*cluster.VttabletProcess {
Expand Down
221 changes: 221 additions & 0 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -166,3 +167,223 @@ func TestVStreamFailover(t *testing.T) {
require.NoError(t, err)
require.Equal(t, insertedRows, numRowEvents)
}

const schemaUnsharded = `
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
`
const vschemaUnsharded = `
{
"tables": {
"customer_seq": {
"type": "sequence"
}
}
}
`
const schemaSharded = `
create table customer(cid int, name varbinary(128), primary key(cid)) CHARSET=utf8mb4;
`
const vschemaSharded = `
{
"sharded": true,
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
"customer": {
"column_vindexes": [
{
"column": "cid",
"name": "reverse_bits"
}
],
"auto_increment": {
"column": "cid",
"sequence": "customer_seq"
}
}
}
}
`

func insertRow(keyspace, table string, id int) {
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false)
vtgateConn.ExecuteFetch("commit", 1000, false)
}

type numEvents struct {
numRowEvents, numJournalEvents int64
numLessThan80Events, numGreaterThan80Events int64
numLessThan40Events, numGreaterThan40Events int64
}

// tests the StopOnReshard flag
func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID int) *numEvents {
defaultCellName := "zone1"
allCells := []string{"zone1"}
allCellNames = "zone1"
vc = NewVitessCluster(t, "TestVStreamStopOnReshard", allCells, mainClusterConfig)

require.NotNil(t, vc)
defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets
defer func() { defaultReplicas = 1 }()

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

// some initial data
for i := 0; i < 10; i++ {
insertRow("sharded", "customer", i)
}

vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200)

ctx := context.Background()
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
if err != nil {
log.Fatal(err)
}
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "sharded",
Gtid: "current",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard}
done := false

id := 1000
// first goroutine that keeps inserting rows into table being streamed until a minute after reshard
// * if StopOnReshard is false we should keep getting events on the new shards
// * if StopOnReshard is true we should get a journal event and no events on the new shards
go func() {
for {
if done {
return
}
id++
time.Sleep(1 * time.Second)
insertRow("sharded", "customer", id)
}
}()
// stream events from the VStream API
var ne numEvents
go func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
connect := false
numErrors := 0
for {
if connect { // if vtgate returns a transient error try reconnecting from the last seen vgtid
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
connect = false
}
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_VGTID:
vgtid = ev.Vgtid
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80":
ne.numLessThan80Events++
case "80-":
ne.numGreaterThan80Events++
case "-40":
ne.numLessThan40Events++
case "40-":
ne.numGreaterThan40Events++
}
ne.numRowEvents++
case binlogdatapb.VEventType_JOURNAL:
ne.numJournalEvents++
}
}
case io.EOF:
log.Infof("Stream Ended")
done = true
default:
log.Infof("%s:: remote error: %v", time.Now(), err)
numErrors++
if numErrors > 10 { // if vtgate is continuously unavailable error the test
return
}
if strings.Contains(strings.ToLower(err.Error()), "unavailable") {
// this should not happen, but maybe the buffering logic might return a transient
// error during resharding. So adding this logic to reduce future flakiness
time.Sleep(100 * time.Millisecond)
connect = true
} else {
// failure, stop test
done = true
}
}
if done {
return
}
}
}()

ticker := time.NewTicker(1 * time.Second)
tickCount := 0
for {
<-ticker.C
tickCount++
switch tickCount {
case 1:
reshard(t, "sharded", "customer", "vstreamStopOnReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName)
case 60:
done = true
}
if done {
break
}
}
return &ne
}

func TestVStreamStopOnReshardTrue(t *testing.T) {
ne := testVStreamStopOnReshardFlag(t, true, 1000)
require.Greater(t, ne.numJournalEvents, int64(0))
require.NotZero(t, ne.numRowEvents)
require.NotZero(t, ne.numLessThan80Events)
require.NotZero(t, ne.numGreaterThan80Events)
require.Zero(t, ne.numLessThan40Events)
require.Zero(t, ne.numGreaterThan40Events)
}

func TestVStreamStopOnReshardFalse(t *testing.T) {
ne := testVStreamStopOnReshardFlag(t, false, 2000)
require.Equal(t, int64(0), ne.numJournalEvents)
require.NotZero(t, ne.numRowEvents)
require.NotZero(t, ne.numLessThan80Events)
require.NotZero(t, ne.numGreaterThan80Events)
require.NotZero(t, ne.numLessThan40Events)
require.NotZero(t, ne.numGreaterThan40Events)
}
Loading

0 comments on commit ba0b822

Please sign in to comment.