diff --git a/.github/workflows/cluster_endtoend_vreplication_basic.yml b/.github/workflows/cluster_endtoend_vreplication_basic.yml new file mode 100644 index 00000000000..00a960ae4e5 --- /dev/null +++ b/.github/workflows/cluster_endtoend_vreplication_basic.yml @@ -0,0 +1,38 @@ +name: Cluster (vreplication_basic) +on: [push, pull_request] +jobs: + + build: + name: Run endtoend tests on Cluster (vreplication_basic) + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.15 + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + eatmydata -- go run test.go -docker=false -print-log -follow -shard vreplication_basic diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index e1e2b8820bd..170236969bb 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -205,8 +205,14 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace, vc.Topo.Port, globalConfig.hostname, globalConfig.tmpDir, - []string{"-queryserver-config-schema-reload-time", "5"}, //FIXME: for multi-cell initial schema doesn't seem to load without this + []string{ + "-queryserver-config-schema-reload-time", "5", + "-enable-lag-throttler", + "-heartbeat_enable", + "-heartbeat_interval", "250ms", + }, //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time" false) + require.NotNil(t, vttablet) vttablet.SupportsBackup = false diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index dde5228f238..5a3b9cf0365 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "strings" "testing" "time" @@ -30,17 +31,20 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "vitess.io/vitess/go/vt/wrangler" ) var ( - vc *VitessCluster - vtgate *cluster.VtgateProcess - defaultCell *Cell - vtgateConn *mysql.Conn - defaultRdonly int - defaultReplicas int - allCellNames string + vc *VitessCluster + vtgate *cluster.VtgateProcess + defaultCell *Cell + vtgateConn *mysql.Conn + defaultRdonly int + defaultReplicas int + allCellNames string + httpClient = throttlebase.SetupHTTPClient(time.Second) + throttlerAppName = "vstreamer" ) func init() { @@ -48,11 +52,42 @@ func init() { defaultReplicas = 1 } +func throttleResponse(tablet *cluster.VttabletProcess, path string) (resp *http.Response, respBody string, err error) { + apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.TabletHostname, tablet.Port, path) + resp, err = httpClient.Get(apiURL) + if err != nil { + return resp, respBody, err + } + b, err := ioutil.ReadAll(resp.Body) + respBody = string(b) + return resp, respBody, err +} + +func throttleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerAppName)) +} + +func unthrottleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerAppName)) +} + +func throttlerCheckSelf(tablet *cluster.VttabletProcess) (resp *http.Response, respBody string, err error) { + apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerAppName) + resp, err = httpClient.Get(apiURL) + if err != nil { + return resp, respBody, err + } + b, err := ioutil.ReadAll(resp.Body) + respBody = string(b) + return resp, respBody, err +} + func TestBasicVreplicationWorkflow(t *testing.T) { defaultCellName := "zone1" allCells := []string{"zone1"} allCellNames = "zone1" vc = InitCluster(t, allCells) + require.NotNil(t, vc) defaultReplicas = 0 // because of CI resource constraints we can only run this test with master tablets defer func() { defaultReplicas = 1 }() @@ -153,17 +188,19 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) { } func insertInitialData(t *testing.T) { - fmt.Printf("Inserting initial data\n") - lines, _ := ioutil.ReadFile("unsharded_init_data.sql") - execMultipleQueries(t, vtgateConn, "product:0", string(lines)) - execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);") - execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);") - fmt.Printf("Done inserting initial data\n") + t.Run("insertInitialData", func(t *testing.T) { + fmt.Printf("Inserting initial data\n") + lines, _ := ioutil.ReadFile("unsharded_init_data.sql") + execMultipleQueries(t, vtgateConn, "product:0", string(lines)) + execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);") + execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);") + fmt.Printf("Done inserting initial data\n") - validateCount(t, vtgateConn, "product:0", "product", 2) - validateCount(t, vtgateConn, "product:0", "customer", 3) - validateQuery(t, vtgateConn, "product:0", "select * from merchant", - `[[VARCHAR("monoprice") VARCHAR("electronics")] [VARCHAR("newegg") VARCHAR("electronics")]]`) + validateCount(t, vtgateConn, "product:0", "product", 2) + validateCount(t, vtgateConn, "product:0", "customer", 3) + validateQuery(t, vtgateConn, "product:0", "select * from merchant", + `[[VARCHAR("monoprice") VARCHAR("electronics")] [VARCHAR("newegg") VARCHAR("electronics")]]`) + }) } func insertMoreCustomers(t *testing.T, numCustomers int) { @@ -184,394 +221,471 @@ func insertMoreProducts(t *testing.T) { execVtgateQuery(t, vtgateConn, "product", sql) } -func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) { - workflow := "p2c" - sourceKs := "product" - targetKs := "customer" - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - if _, err := vc.AddKeyspace(t, cells, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "-80"), 1); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil { - t.Fatal(err) - } - tables := "customer" - moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables) - - // Assume we are operating on first cell - defaultCell := cells[0] - custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] - customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet - customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet - - catchup(t, customerTab1, workflow, "MoveTables") - catchup(t, customerTab2, workflow, "MoveTables") - - productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet - query := "select * from customer" - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", query, query)) - insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')" - matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)" - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) - vdiff(t, ksWorkflow) - switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) - switchReads(t, allCellNames, ksWorkflow) - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)) - switchWritesDryRun(t, ksWorkflow, dryRunResultsSwitchWritesCustomerShard) - switchWrites(t, ksWorkflow, false) - ksShards := []string{"product/0", "customer/-80", "customer/80-"} - printShardPositions(vc, ksShards) - insertQuery2 := "insert into customer(name, cid) values('tempCustomer2', 100)" - matchInsertQuery2 := "insert into customer(`name`, cid) values (:vtg1, :_cid0)" - require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2)) - - insertQuery2 = "insert into customer(name, cid) values('tempCustomer3', 101)" //ID 101, hence due to reverse_bits in shard 80- - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)) - - insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80 - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) - reverseKsWorkflow := "product.p2c_reverse" - if testReverse { - //Reverse Replicate - switchReads(t, allCellNames, reverseKsWorkflow) - printShardPositions(vc, ksShards) - switchWrites(t, reverseKsWorkflow, false) - - insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')" - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) - // both inserts go into 80-, this tests the edge-case where a stream (-80) has no relevant new events after the previous switch - insertQuery1 = "insert into customer(cid, name) values(1003, 'tempCustomer6')" - require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery1, matchInsertQuery1)) - insertQuery1 = "insert into customer(cid, name) values(1004, 'tempCustomer7')" - require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery1, matchInsertQuery1)) - - //Go forward again - switchReads(t, allCellNames, ksWorkflow) - switchWrites(t, ksWorkflow, false) - dropSourcesDryRun(t, ksWorkflow, false, dryRunResultsDropSourcesDropCustomerShard) - dropSourcesDryRun(t, ksWorkflow, true, dryRunResultsDropSourcesRenameCustomerShard) - - var exists bool - exists, err := checkIfBlacklistExists(t, vc, "product:0", "customer") - require.NoError(t, err, "Error getting blacklist for customer:0") - require.True(t, exists) - dropSources(t, ksWorkflow) - - exists, err = checkIfBlacklistExists(t, vc, "product:0", "customer") - require.NoError(t, err, "Error getting blacklist for customer:0") - require.False(t, exists) +func insertMoreProductsForThrottler(t *testing.T) { + sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');" + execVtgateQuery(t, vtgateConn, "product", sql) +} - for _, shard := range strings.Split("-80,80-", ",") { - expectNumberOfStreams(t, vtgateConn, "shardCustomerTargetStreams", "p2c", "customer:"+shard, 0) +func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) { + t.Run("shardCustomer", func(t *testing.T) { + workflow := "p2c" + sourceKs := "product" + targetKs := "customer" + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + if _, err := vc.AddKeyspace(t, cells, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200); err != nil { + t.Fatal(err) } + if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "-80"), 1); err != nil { + t.Fatal(err) + } + if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil { + t.Fatal(err) + } + tables := "customer" + moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables) - expectNumberOfStreams(t, vtgateConn, "shardCustomerReverseStreams", "p2c_reverse", "product:0", 0) - - var found bool - found, err = checkIfTableExists(t, vc, "zone1-100", "customer") - assert.NoError(t, err, "Customer table not deleted from zone1-100") - require.False(t, found) + // Assume we are operating on first cell + defaultCell := cells[0] + custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] + customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet + customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet - found, err = checkIfTableExists(t, vc, "zone1-200", "customer") - assert.NoError(t, err, "Customer table not deleted from zone1-200") - require.True(t, found) + catchup(t, customerTab1, workflow, "MoveTables") + catchup(t, customerTab2, workflow, "MoveTables") - insertQuery2 = "insert into customer(name, cid) values('tempCustomer8', 103)" //ID 103, hence due to reverse_bits in shard 80- + productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + query := "select * from customer" + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", query, query)) + insertQuery1 := "insert into customer(cid, name) values(1001, 'tempCustomer1')" + matchInsertQuery1 := "insert into customer(cid, `name`) values (:vtg1, :vtg2)" + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) + vdiff(t, ksWorkflow) + switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard) + switchReads(t, allCellNames, ksWorkflow) + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)) + switchWritesDryRun(t, ksWorkflow, dryRunResultsSwitchWritesCustomerShard) + switchWrites(t, ksWorkflow, false) + ksShards := []string{"product/0", "customer/-80", "customer/80-"} + printShardPositions(vc, ksShards) + insertQuery2 := "insert into customer(name, cid) values('tempCustomer2', 100)" + matchInsertQuery2 := "insert into customer(`name`, cid) values (:vtg1, :_cid0)" require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2)) - insertQuery2 = "insert into customer(name, cid) values('tempCustomer10', 104)" //ID 105, hence due to reverse_bits in shard -80 - require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) - insertQuery2 = "insert into customer(name, cid) values('tempCustomer9', 105)" //ID 104, hence due to reverse_bits in shard 80- + + insertQuery2 = "insert into customer(name, cid) values('tempCustomer3', 101)" //ID 101, hence due to reverse_bits in shard 80- require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)) - execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'") - validateCountInTablet(t, customerTab1, "customer", "customer", 1) - validateCountInTablet(t, customerTab2, "customer", "customer", 2) - validateCount(t, vtgateConn, "customer", "customer.customer", 3) + insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80 + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) + reverseKsWorkflow := "product.p2c_reverse" + if testReverse { + //Reverse Replicate + switchReads(t, allCellNames, reverseKsWorkflow) + printShardPositions(vc, ksShards) + switchWrites(t, reverseKsWorkflow, false) + + insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')" + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) + // both inserts go into 80-, this tests the edge-case where a stream (-80) has no relevant new events after the previous switch + insertQuery1 = "insert into customer(cid, name) values(1003, 'tempCustomer6')" + require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery1, matchInsertQuery1)) + insertQuery1 = "insert into customer(cid, name) values(1004, 'tempCustomer7')" + require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery1, matchInsertQuery1)) + + //Go forward again + switchReads(t, allCellNames, ksWorkflow) + switchWrites(t, ksWorkflow, false) + dropSourcesDryRun(t, ksWorkflow, false, dryRunResultsDropSourcesDropCustomerShard) + dropSourcesDryRun(t, ksWorkflow, true, dryRunResultsDropSourcesRenameCustomerShard) + + var exists bool + exists, err := checkIfBlacklistExists(t, vc, "product:0", "customer") + require.NoError(t, err, "Error getting blacklist for customer:0") + require.True(t, exists) + dropSources(t, ksWorkflow) + + exists, err = checkIfBlacklistExists(t, vc, "product:0", "customer") + require.NoError(t, err, "Error getting blacklist for customer:0") + require.False(t, exists) + + for _, shard := range strings.Split("-80,80-", ",") { + expectNumberOfStreams(t, vtgateConn, "shardCustomerTargetStreams", "p2c", "customer:"+shard, 0) + } - query = "insert into customer (name, cid) values('george', 5)" - execVtgateQuery(t, vtgateConn, "customer", query) - validateCountInTablet(t, customerTab1, "customer", "customer", 1) - validateCountInTablet(t, customerTab2, "customer", "customer", 3) - validateCount(t, vtgateConn, "customer", "customer.customer", 4) - } + expectNumberOfStreams(t, vtgateConn, "shardCustomerReverseStreams", "p2c_reverse", "product:0", 0) + + var found bool + found, err = checkIfTableExists(t, vc, "zone1-100", "customer") + assert.NoError(t, err, "Customer table not deleted from zone1-100") + require.False(t, found) + + found, err = checkIfTableExists(t, vc, "zone1-200", "customer") + assert.NoError(t, err, "Customer table not deleted from zone1-200") + require.True(t, found) + + insertQuery2 = "insert into customer(name, cid) values('tempCustomer8', 103)" //ID 103, hence due to reverse_bits in shard 80- + require.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2)) + insertQuery2 = "insert into customer(name, cid) values('tempCustomer10', 104)" //ID 105, hence due to reverse_bits in shard -80 + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) + insertQuery2 = "insert into customer(name, cid) values('tempCustomer9', 105)" //ID 104, hence due to reverse_bits in shard 80- + require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)) + + execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'") + validateCountInTablet(t, customerTab1, "customer", "customer", 1) + validateCountInTablet(t, customerTab2, "customer", "customer", 2) + validateCount(t, vtgateConn, "customer", "customer.customer", 3) + + query = "insert into customer (name, cid) values('george', 5)" + execVtgateQuery(t, vtgateConn, "customer", query) + validateCountInTablet(t, customerTab1, "customer", "customer", 1) + validateCountInTablet(t, customerTab2, "customer", "customer", 3) + validateCount(t, vtgateConn, "customer", "customer.customer", 4) + } + }) } func validateRollupReplicates(t *testing.T) { - insertMoreProducts(t) - time.Sleep(1 * time.Second) - validateCount(t, vtgateConn, "product", "rollup", 1) - validateQuery(t, vtgateConn, "product:0", "select rollupname, kount from rollup", - `[[VARCHAR("total") INT32(5)]]`) + t.Run("validateRollupReplicates", func(t *testing.T) { + insertMoreProducts(t) + time.Sleep(1 * time.Second) + validateCount(t, vtgateConn, "product", "rollup", 1) + validateQuery(t, vtgateConn, "product:0", "select rollupname, kount from rollup", + `[[VARCHAR("total") INT32(5)]]`) + }) } func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias string) { - ksName := "customer" - counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5} - reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil, cells, sourceCellOrAlias) - validateCount(t, vtgateConn, ksName, "customer", 20) - query := "insert into customer (name) values('yoko')" - execVtgateQuery(t, vtgateConn, ksName, query) - validateCount(t, vtgateConn, ksName, "customer", 21) + t.Run("reshardCustomer2to4Split", func(t *testing.T) { + ksName := "customer" + counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5} + reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil, cells, sourceCellOrAlias) + validateCount(t, vtgateConn, ksName, "customer", 20) + query := "insert into customer (name) values('yoko')" + execVtgateQuery(t, vtgateConn, ksName, query) + validateCount(t, vtgateConn, ksName, "customer", 21) + }) } func reshardMerchant2to3SplitMerge(t *testing.T) { - ksName := "merchant" - counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0} - reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "") - validateCount(t, vtgateConn, ksName, "merchant", 2) - query := "insert into merchant (mname, category) values('amazon', 'electronics')" - execVtgateQuery(t, vtgateConn, ksName, query) - validateCount(t, vtgateConn, ksName, "merchant", 3) + t.Run("reshardMerchant2to3SplitMerge", func(t *testing.T) { + ksName := "merchant" + counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0} + reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "") + validateCount(t, vtgateConn, ksName, "merchant", 2) + query := "insert into merchant (mname, category) values('amazon', 'electronics')" + execVtgateQuery(t, vtgateConn, ksName, query) + validateCount(t, vtgateConn, ksName, "merchant", 3) + + var output string + var err error - var output string - var err error - - for _, shard := range strings.Split("-80,80-", ",") { - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard) - if err == nil { - t.Fatal("GetShard merchant:-80 failed") + for _, shard := range strings.Split("-80,80-", ",") { + output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard) + if err == nil { + t.Fatal("GetShard merchant:-80 failed") + } + assert.Contains(t, output, "node doesn't exist", "GetShard succeeded for dropped shard merchant:"+shard) } - assert.Contains(t, output, "node doesn't exist", "GetShard succeeded for dropped shard merchant:"+shard) - } - for _, shard := range strings.Split("-40,40-c0,c0-", ",") { - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard) - if err != nil { - t.Fatalf("GetShard merchant failed for: %s: %v", shard, err) + for _, shard := range strings.Split("-40,40-c0,c0-", ",") { + output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetShard", "merchant:"+shard) + if err != nil { + t.Fatalf("GetShard merchant failed for: %s: %v", shard, err) + } + assert.NotContains(t, output, "node doesn't exist", "GetShard failed for valid shard merchant:"+shard) + assert.Contains(t, output, "master_alias", "GetShard failed for valid shard merchant:"+shard) } - assert.NotContains(t, output, "node doesn't exist", "GetShard failed for valid shard merchant:"+shard) - assert.Contains(t, output, "master_alias", "GetShard failed for valid shard merchant:"+shard) - } - for _, shard := range strings.Split("-40,40-c0,c0-", ",") { - expectNumberOfStreams(t, vtgateConn, "reshardMerchant2to3SplitMerge", "m2m3", "merchant:"+shard, 0) - } - - var found bool - found, err = checkIfTableExists(t, vc, "zone1-1600", "customer") - assert.NoError(t, err, "Customer table found incorrectly in zone1-1600") - require.False(t, found) - found, err = checkIfTableExists(t, vc, "zone1-1600", "merchant") - assert.NoError(t, err, "Merchant table not found in zone1-1600") - require.True(t, found) + for _, shard := range strings.Split("-40,40-c0,c0-", ",") { + expectNumberOfStreams(t, vtgateConn, "reshardMerchant2to3SplitMerge", "m2m3", "merchant:"+shard, 0) + } + var found bool + found, err = checkIfTableExists(t, vc, "zone1-1600", "customer") + assert.NoError(t, err, "Customer table found incorrectly in zone1-1600") + require.False(t, found) + found, err = checkIfTableExists(t, vc, "zone1-1600", "merchant") + assert.NoError(t, err, "Merchant table not found in zone1-1600") + require.True(t, found) + }) } func reshardMerchant3to1Merge(t *testing.T) { - ksName := "merchant" - counts := map[string]int{"zone1-2000": 3} - reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "") - validateCount(t, vtgateConn, ksName, "merchant", 3) - query := "insert into merchant (mname, category) values('flipkart', 'electronics')" - execVtgateQuery(t, vtgateConn, ksName, query) - validateCount(t, vtgateConn, ksName, "merchant", 4) + t.Run("reshardMerchant3to1Merge", func(t *testing.T) { + ksName := "merchant" + counts := map[string]int{"zone1-2000": 3} + reshard(t, ksName, "merchant", "m3m1", "-40,40-c0,c0-", "0", 2000, counts, nil, nil, "") + validateCount(t, vtgateConn, ksName, "merchant", 3) + query := "insert into merchant (mname, category) values('flipkart', 'electronics')" + execVtgateQuery(t, vtgateConn, ksName, query) + validateCount(t, vtgateConn, ksName, "merchant", 4) + }) } func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/split, c0- stays the same ending up with 3 - ksName := "customer" - counts := map[string]int{"zone1-1000": 8, "zone1-1100": 8, "zone1-1200": 5} - reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil, nil, "") + t.Run("reshardCustomer3to2SplitMerge", func(t *testing.T) { + ksName := "customer" + counts := map[string]int{"zone1-1000": 8, "zone1-1100": 8, "zone1-1200": 5} + reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil, nil, "") + }) } func reshardCustomer3to1Merge(t *testing.T) { //to unsharded - ksName := "customer" - counts := map[string]int{"zone1-1500": 21} - reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "") + t.Run("reshardCustomer3to1Merge", func(t *testing.T) { + ksName := "customer" + counts := map[string]int{"zone1-1500": 21} + reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "") + }) } func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string) { - if cells == nil { - cells = []*Cell{defaultCell} - } - if sourceCellOrAlias == "" { - sourceCellOrAlias = defaultCell.Name - } - ksWorkflow := ksName + "." + workflow - keyspace := vc.Cells[defaultCell.Name].Keyspaces[ksName] - require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase)) - arrTargetShardNames := strings.Split(targetShards, ",") - - for _, shardName := range arrTargetShardNames { - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", ksName, shardName), 1); err != nil { - t.Fatal(err) + t.Run("reshard", func(t *testing.T) { + if cells == nil { + cells = []*Cell{defaultCell} } - } - if err := vc.VtctlClient.ExecuteCommand("Reshard", "-cells="+sourceCellOrAlias, "-tablet_types=replica,master", ksWorkflow, sourceShards, targetShards); err != nil { - t.Fatalf("Reshard command failed with %+v\n", err) - } - tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "master") - targetShards = "," + targetShards + "," - for _, tab := range tablets { - if strings.Contains(targetShards, ","+tab.Shard+",") { - fmt.Printf("Waiting for vrepl to catch up on %s since it IS a target shard\n", tab.Shard) - catchup(t, tab, workflow, "Reshard") - } else { - fmt.Printf("Not waiting for vrepl to catch up on %s since it is NOT a target shard\n", tab.Shard) - continue + if sourceCellOrAlias == "" { + sourceCellOrAlias = defaultCell.Name } - } - vdiff(t, ksWorkflow) - switchReads(t, allCellNames, ksWorkflow) - if dryRunResultSwitchWrites != nil { - switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites) - } - switchWrites(t, ksWorkflow, false) - dropSources(t, ksWorkflow) + ksWorkflow := ksName + "." + workflow + keyspace := vc.Cells[defaultCell.Name].Keyspaces[ksName] + require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase)) + arrTargetShardNames := strings.Split(targetShards, ",") + + for _, shardName := range arrTargetShardNames { + if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", ksName, shardName), 1); err != nil { + t.Fatal(err) + } + } + if err := vc.VtctlClient.ExecuteCommand("Reshard", "-cells="+sourceCellOrAlias, "-tablet_types=replica,master", ksWorkflow, sourceShards, targetShards); err != nil { + t.Fatalf("Reshard command failed with %+v\n", err) + } + tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "master") + targetShards = "," + targetShards + "," + for _, tab := range tablets { + if strings.Contains(targetShards, ","+tab.Shard+",") { + fmt.Printf("Waiting for vrepl to catch up on %s since it IS a target shard\n", tab.Shard) + catchup(t, tab, workflow, "Reshard") + } else { + fmt.Printf("Not waiting for vrepl to catch up on %s since it is NOT a target shard\n", tab.Shard) + continue + } + } + vdiff(t, ksWorkflow) + switchReads(t, allCellNames, ksWorkflow) + if dryRunResultSwitchWrites != nil { + switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites) + } + switchWrites(t, ksWorkflow, false) + dropSources(t, ksWorkflow) - for tabletName, count := range counts { - if tablets[tabletName] == nil { - continue + for tabletName, count := range counts { + if tablets[tabletName] == nil { + continue + } + validateCountInTablet(t, tablets[tabletName], ksName, tableName, count) } - validateCountInTablet(t, tablets[tabletName], ksName, tableName, count) - } + }) } func shardOrders(t *testing.T) { - workflow := "o2c" - cell := defaultCell.Name - sourceKs := "product" - targetKs := "customer" - tables := "orders" - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - applyVSchema(t, ordersVSchema, targetKs) - moveTables(t, cell, workflow, sourceKs, targetKs, tables) - - custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] - customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet - customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet - catchup(t, customerTab1, workflow, "MoveTables") - catchup(t, customerTab2, workflow, "MoveTables") - vdiff(t, ksWorkflow) - switchReads(t, allCellNames, ksWorkflow) - switchWrites(t, ksWorkflow, false) - dropSources(t, ksWorkflow) - validateCountInTablet(t, customerTab1, "customer", "orders", 1) - validateCountInTablet(t, customerTab2, "customer", "orders", 2) - validateCount(t, vtgateConn, "customer", "orders", 3) + t.Run("shardOrders", func(t *testing.T) { + workflow := "o2c" + cell := defaultCell.Name + sourceKs := "product" + targetKs := "customer" + tables := "orders" + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + applyVSchema(t, ordersVSchema, targetKs) + moveTables(t, cell, workflow, sourceKs, targetKs, tables) + + custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"] + customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet + customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet + catchup(t, customerTab1, workflow, "MoveTables") + catchup(t, customerTab2, workflow, "MoveTables") + vdiff(t, ksWorkflow) + switchReads(t, allCellNames, ksWorkflow) + switchWrites(t, ksWorkflow, false) + dropSources(t, ksWorkflow) + validateCountInTablet(t, customerTab1, "customer", "orders", 1) + validateCountInTablet(t, customerTab2, "customer", "orders", 2) + validateCount(t, vtgateConn, "customer", "orders", 3) + }) } func shardMerchant(t *testing.T) { - workflow := "p2m" - cell := defaultCell.Name - sourceKs := "product" - targetKs := "merchant" - tables := "merchant" - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "-80"), 1); err != nil { - t.Fatal(err) - } - if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "80-"), 1); err != nil { - t.Fatal(err) - } - moveTables(t, cell, workflow, sourceKs, targetKs, tables) - merchantKs := vc.Cells[defaultCell.Name].Keyspaces["merchant"] - merchantTab1 := merchantKs.Shards["-80"].Tablets["zone1-400"].Vttablet - merchantTab2 := merchantKs.Shards["80-"].Tablets["zone1-500"].Vttablet - catchup(t, merchantTab1, workflow, "MoveTables") - catchup(t, merchantTab2, workflow, "MoveTables") - - vdiff(t, "merchant.p2m") - switchReads(t, allCellNames, ksWorkflow) - switchWrites(t, ksWorkflow, false) - dropSources(t, ksWorkflow) - - validateCountInTablet(t, merchantTab1, "merchant", "merchant", 1) - validateCountInTablet(t, merchantTab2, "merchant", "merchant", 1) - validateCount(t, vtgateConn, "merchant", "merchant", 2) + t.Run("shardMerchant", func(t *testing.T) { + workflow := "p2m" + cell := defaultCell.Name + sourceKs := "product" + targetKs := "merchant" + tables := "merchant" + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "merchant", "-80,80-", merchantVSchema, "", defaultReplicas, defaultRdonly, 400); err != nil { + t.Fatal(err) + } + if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "-80"), 1); err != nil { + t.Fatal(err) + } + if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "merchant", "80-"), 1); err != nil { + t.Fatal(err) + } + moveTables(t, cell, workflow, sourceKs, targetKs, tables) + merchantKs := vc.Cells[defaultCell.Name].Keyspaces["merchant"] + merchantTab1 := merchantKs.Shards["-80"].Tablets["zone1-400"].Vttablet + merchantTab2 := merchantKs.Shards["80-"].Tablets["zone1-500"].Vttablet + catchup(t, merchantTab1, workflow, "MoveTables") + catchup(t, merchantTab2, workflow, "MoveTables") + + vdiff(t, "merchant.p2m") + switchReads(t, allCellNames, ksWorkflow) + switchWrites(t, ksWorkflow, false) + dropSources(t, ksWorkflow) + validateCountInTablet(t, merchantTab1, "merchant", "merchant", 1) + validateCountInTablet(t, merchantTab2, "merchant", "merchant", 1) + validateCount(t, vtgateConn, "merchant", "merchant", 2) + }) } func vdiff(t *testing.T, workflow string) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "-format", "json", workflow) - fmt.Printf("vdiff err: %+v, output: %+v\n", err, output) - require.Nil(t, err) - require.NotNil(t, output) - diffReports := make([]*wrangler.DiffReport, 0) - err = json.Unmarshal([]byte(output), &diffReports) - require.Nil(t, err) - if len(diffReports) < 1 { - t.Fatal("VDiff did not return a valid json response " + output + "\n") - } - require.True(t, len(diffReports) > 0) - for key, diffReport := range diffReports { - if diffReport.ProcessedRows != diffReport.MatchingRows { - t.Errorf("vdiff error for %d : %#v\n", key, diffReport) + t.Run("vdiff", func(t *testing.T) { + output, err := vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "-format", "json", workflow) + fmt.Printf("vdiff err: %+v, output: %+v\n", err, output) + require.Nil(t, err) + require.NotNil(t, output) + diffReports := make([]*wrangler.DiffReport, 0) + err = json.Unmarshal([]byte(output), &diffReports) + require.Nil(t, err) + if len(diffReports) < 1 { + t.Fatal("VDiff did not return a valid json response " + output + "\n") } - } + require.True(t, len(diffReports) > 0) + for key, diffReport := range diffReports { + if diffReport.ProcessedRows != diffReport.MatchingRows { + t.Errorf("vdiff error for %d : %#v\n", key, diffReport) + } + } + }) } func materialize(t *testing.T, spec string) { - err := vc.VtctlClient.ExecuteCommand("Materialize", spec) - require.NoError(t, err, "Materialize") + t.Run("materialize", func(t *testing.T) { + err := vc.VtctlClient.ExecuteCommand("Materialize", spec) + require.NoError(t, err, "Materialize") + }) } func materializeProduct(t *testing.T) { - workflow := "cproduct" - keyspace := "customer" - applyVSchema(t, materializeProductVSchema, keyspace) - materialize(t, materializeProductSpec) - customerTablets := vc.getVttabletsInKeyspace(t, defaultCell, keyspace, "master") - for _, tab := range customerTablets { - catchup(t, tab, workflow, "Materialize") - } - for _, tab := range customerTablets { - validateCountInTablet(t, tab, keyspace, workflow, 5) - } + t.Run("materializeProduct", func(t *testing.T) { + // materializing from "product" keyspace to "customer" keyspace + workflow := "cproduct" + keyspace := "customer" + applyVSchema(t, materializeProductVSchema, keyspace) + materialize(t, materializeProductSpec) + customerTablets := vc.getVttabletsInKeyspace(t, defaultCell, keyspace, "master") + { + for _, tab := range customerTablets { + catchup(t, tab, workflow, "Materialize") + } + for _, tab := range customerTablets { + validateCountInTablet(t, tab, keyspace, workflow, 5) + } + } + + productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "master") + t.Run("throttle-app", func(t *testing.T) { + // Now, throttle the streamer on source tablets, insert some rows + for _, tab := range productTablets { + _, body, err := throttleStreamer(tab) + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + } + // Wait for throttling to take effect (caching will expire by this time): + time.Sleep(1 * time.Second) + for _, tab := range productTablets { + _, body, err := throttlerCheckSelf(tab) + assert.NoError(t, err) + assert.Contains(t, body, "417") + } + insertMoreProductsForThrottler(t) + // To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place, + time.Sleep(1 * time.Second) + // we expect the additional rows to **not appear** in the materialized view + for _, tab := range customerTablets { + validateCountInTablet(t, tab, keyspace, workflow, 5) + } + }) + t.Run("unthrottle-app", func(t *testing.T) { + // unthrottle on source tablets, and expect the rows to show up + for _, tab := range productTablets { + _, body, err := unthrottleStreamer(tab) + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + } + // give time for unthrottling to take effect and for target to fetch data + time.Sleep(3 * time.Second) + for _, tab := range customerTablets { + validateCountInTablet(t, tab, keyspace, workflow, 8) + } + }) + }) } func materializeRollup(t *testing.T) { - keyspace := "product" - workflow := "rollup" - applyVSchema(t, materializeSalesVSchema, keyspace) - productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet - materialize(t, materializeRollupSpec) - catchup(t, productTab, workflow, "Materialize") - validateCount(t, vtgateConn, "product", "rollup", 1) - validateQuery(t, vtgateConn, "product:0", "select rollupname, kount from rollup", - `[[VARCHAR("total") INT32(2)]]`) + t.Run("materializeRollup", func(t *testing.T) { + keyspace := "product" + workflow := "rollup" + applyVSchema(t, materializeSalesVSchema, keyspace) + productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + materialize(t, materializeRollupSpec) + catchup(t, productTab, workflow, "Materialize") + validateCount(t, vtgateConn, "product", "rollup", 1) + validateQuery(t, vtgateConn, "product:0", "select rollupname, kount from rollup", + `[[VARCHAR("total") INT32(2)]]`) + }) } func materializeSales(t *testing.T) { - keyspace := "product" - applyVSchema(t, materializeSalesVSchema, keyspace) - materialize(t, materializeSalesSpec) - productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet - catchup(t, productTab, "sales", "Materialize") - validateCount(t, vtgateConn, "product", "sales", 2) - validateQuery(t, vtgateConn, "product:0", "select kount, amount from sales", - `[[INT32(1) INT32(10)] [INT32(2) INT32(35)]]`) + t.Run("materializeSales", func(t *testing.T) { + keyspace := "product" + applyVSchema(t, materializeSalesVSchema, keyspace) + materialize(t, materializeSalesSpec) + productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet + catchup(t, productTab, "sales", "Materialize") + validateCount(t, vtgateConn, "product", "sales", 2) + validateQuery(t, vtgateConn, "product:0", "select kount, amount from sales", + `[[INT32(1) INT32(10)] [INT32(2) INT32(35)]]`) + }) } func materializeMerchantSales(t *testing.T) { - workflow := "msales" - materialize(t, materializeMerchantSalesSpec) - merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "master") - for _, tab := range merchantTablets { - catchup(t, tab, workflow, "Materialize") - } - validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "msales", 1) - validateCountInTablet(t, merchantTablets["zone1-500"], "merchant", "msales", 1) - validateCount(t, vtgateConn, "merchant", "msales", 2) + t.Run("materializeMerchantSales", func(t *testing.T) { + workflow := "msales" + materialize(t, materializeMerchantSalesSpec) + merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "master") + for _, tab := range merchantTablets { + catchup(t, tab, workflow, "Materialize") + } + validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "msales", 1) + validateCountInTablet(t, merchantTablets["zone1-500"], "merchant", "msales", 1) + validateCount(t, vtgateConn, "merchant", "msales", 2) + }) } func materializeMerchantOrders(t *testing.T) { - workflow := "morders" - keyspace := "merchant" - applyVSchema(t, merchantOrdersVSchema, keyspace) - materialize(t, materializeMerchantOrdersSpec) - merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "master") - for _, tab := range merchantTablets { - catchup(t, tab, workflow, "Materialize") - } - validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "morders", 2) - validateCountInTablet(t, merchantTablets["zone1-500"], "merchant", "morders", 1) - validateCount(t, vtgateConn, "merchant", "morders", 3) + t.Run("materializeMerchantOrders", func(t *testing.T) { + workflow := "morders" + keyspace := "merchant" + applyVSchema(t, merchantOrdersVSchema, keyspace) + materialize(t, materializeMerchantOrdersSpec) + merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, "merchant", "master") + for _, tab := range merchantTablets { + catchup(t, tab, workflow, "Materialize") + } + validateCountInTablet(t, merchantTablets["zone1-400"], "merchant", "morders", 2) + validateCountInTablet(t, merchantTablets["zone1-500"], "merchant", "morders", 1) + validateCount(t, vtgateConn, "merchant", "morders", 3) + }) } func checkVtgateHealth(t *testing.T, cell *Cell) { diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 48c685a9119..de21af94217 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -59,6 +59,8 @@ func TestSchemaVersioning(t *testing.T) { tsv.EnableHistorian(false) tsv.SetTracking(false) tsv.EnableHeartbeat(false) + tsv.EnableThrottler(false) + defer tsv.EnableThrottler(true) defer tsv.EnableHeartbeat(true) defer tsv.EnableHistorian(true) defer tsv.SetTracking(true) diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index 8d3706fdbe4..20e187730ff 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -89,7 +89,7 @@ func (ec *externalConnector) Get(name string) (*mysqlConnector, error) { c := &mysqlConnector{} c.env = tabletenv.NewEnv(config, name) c.se = schema.NewEngine(c.env) - c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, "") + c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, nil, "") c.vstreamer.InitDBConfig("") c.se.InitDBConfig(c.env.Config().DB.AllPrivsWithDB()) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index d66f3a4c13d..5efbfbfc413 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -99,7 +99,7 @@ func TestMain(m *testing.M) { // engines cannot be initialized in testenv because it introduces // circular dependencies. - streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, env.Cells[0]) + streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) streamerEngine.InitDBConfig(env.KeyspaceName) streamerEngine.Open() defer streamerEngine.Close() diff --git a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go index 8b886bc594a..c2eb9c4af83 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/relaylog.go +++ b/go/vt/vttablet/tabletmanager/vreplication/relaylog.go @@ -64,6 +64,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog { return rl } +// Send writes events to the relay log func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error { rl.mu.Lock() defer rl.mu.Unlock() @@ -83,6 +84,7 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error { return nil } +// Fetch returns all existing items in the relay log, and empties the log func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) { rl.mu.Lock() defer rl.mu.Unlock() diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 550e75deb7f..2963dbb2f7f 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -51,7 +51,7 @@ func NewReplicaConnector(connParams *mysql.ConnParams) *replicaConnector { env := tabletenv.NewEnv(config, "source") c.se = schema.NewEngine(env) c.se.SkipMetaCheck = true - c.vstreamer = vstreamer.NewEngine(env, nil, c.se, "") + c.vstreamer = vstreamer.NewEngine(env, nil, c.se, nil, "") c.se.InitDBConfig(dbconfigs.New(connParams)) // Open diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 35d065f5d21..0a201d5fccb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -188,6 +188,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) { } } +// applyStmtEvent applies an actual DML statement received from the source, directly onto the backend database func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEvent) error { sql := event.Statement if sql == "" { diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index 74402c2886a..36716ae4cb0 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -408,6 +408,21 @@ func (collector *TableGC) checkTables(ctx context.Context) error { return nil } +func (collector *TableGC) throttleStatusOK(ctx context.Context) bool { + if time.Since(collector.lastSuccessfulThrottleCheck) <= throttleCheckDuration { + // if last check was OK just very recently there is no need to check again + return true + } + // It's time to run a throttler check + checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags) + if checkResult.StatusCode != http.StatusOK { + // sorry, we got throttled. + return false + } + collector.lastSuccessfulThrottleCheck = time.Now() + return true +} + // purge continuously purges rows from a table. // This function is non-reentrant: there's only one instance of this function running at any given time. // A timer keeps calling this function, so if it bails out (e.g. on error) it will later resume work @@ -451,15 +466,9 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro log.Infof("TableGC: purge begin for %s", tableName) for { - if time.Since(collector.lastSuccessfulThrottleCheck) > throttleCheckDuration { - // It's time to run a throttler check - checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags) - if checkResult.StatusCode != http.StatusOK { - // sorry, we got throttled. Back off, sleep, try again - time.Sleep(throttleCheckDuration) - continue - } - collector.lastSuccessfulThrottleCheck = time.Now() + for !collector.throttleStatusOK(ctx) { + // Sorry, got throttled. Sleep some time, then check again + time.Sleep(throttleCheckDuration) } // OK, we're clear to go! diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index be1d56b79d5..2bc200d53de 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -157,13 +157,21 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(topoServer, "TabletSrvTopo") }) + tabletTypeFunc := func() topodatapb.TabletType { + if tsv.sm == nil { + return topodatapb.TabletType_UNKNOWN + } + return tsv.sm.Target().TabletType + } + tsv.statelessql = NewQueryList("oltp-stateless") tsv.statefulql = NewQueryList("oltp-stateful") tsv.olapql = NewQueryList("olap") + tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc) tsv.hs = newHealthStreamer(tsv, alias) tsv.se = schema.NewEngine(tsv) tsv.rt = repltracker.NewReplTracker(tsv, alias) - tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, alias.Cell) + tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.lagThrottler, alias.Cell) tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se) tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config) tsv.qe = NewQueryEngine(tsv, tsv.se) @@ -171,14 +179,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) - tabletTypeFunc := func() topodatapb.TabletType { - if tsv.sm == nil { - return topodatapb.TabletType_UNKNOWN - } - return tsv.sm.Target().TabletType - } tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tabletTypeFunc) - tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tabletTypeFunc, tsv.lagThrottler) tsv.sm = &stateManager{ @@ -1621,10 +1622,34 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() { }) } +// registerThrottlerThrottleAppHandler registers a throttler "throttle-app" request +func (tsv *TabletServer) registerThrottlerThrottleAppHandler() { + tsv.exporter.HandleFunc("/throttler/throttle-app", func(w http.ResponseWriter, r *http.Request) { + appName := r.URL.Query().Get("app") + d, err := time.ParseDuration(r.URL.Query().Get("duration")) + if err != nil { + http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError) + return + } + appThrottle := tsv.lagThrottler.ThrottleApp(appName, time.Now().Add(d), 1) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(appThrottle) + }) + tsv.exporter.HandleFunc("/throttler/unthrottle-app", func(w http.ResponseWriter, r *http.Request) { + appName := r.URL.Query().Get("app") + appThrottle := tsv.lagThrottler.UnthrottleApp(appName) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(appThrottle) + }) +} + // registerThrottlerHandlers registers all throttler handlers func (tsv *TabletServer) registerThrottlerHandlers() { tsv.registerThrottlerCheckHandlers() tsv.registerThrottlerStatusHandler() + tsv.registerThrottlerThrottleAppHandler() } func (tsv *TabletServer) registerDebugEnvHandler() { @@ -1639,6 +1664,13 @@ func (tsv *TabletServer) EnableHeartbeat(enabled bool) { tsv.rt.EnableHeartbeat(enabled) } +// EnableThrottler forces throttler to be on or off. +// When throttler is off, it responds to all check requests with HTTP 200 OK +// Only to be used for testing. +func (tsv *TabletServer) EnableThrottler(enabled bool) { + tsv.Config().EnableLagThrottler = enabled +} + // SetTracking forces tracking to be on or off. // Only to be used for testing. func (tsv *TabletServer) SetTracking(enabled bool) { diff --git a/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go b/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go index 46101ba87e6..0633e99c95c 100644 --- a/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go +++ b/go/vt/vttablet/tabletserver/throttle/base/app_throttle.go @@ -23,13 +23,15 @@ import ( // AppThrottle is the definition for an app throttling instruction // - Ratio: [0..1], 0 == no throttle, 1 == fully throttle type AppThrottle struct { + AppName string ExpireAt time.Time Ratio float64 } // NewAppThrottle creates an AppThrottle struct -func NewAppThrottle(expireAt time.Time, ratio float64) *AppThrottle { +func NewAppThrottle(appName string, expireAt time.Time, ratio float64) *AppThrottle { result := &AppThrottle{ + AppName: appName, ExpireAt: expireAt, Ratio: ratio, } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 6bf22c33cf1..7e5a8d3fcdb 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -645,11 +645,10 @@ func (throttler *Throttler) expireThrottledApps() { } // ThrottleApp instructs the throttler to begin throttling an app, to som eperiod and with some ratio. -func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) { +func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) (appThrottle *base.AppThrottle) { throttler.throttledAppsMutex.Lock() defer throttler.throttledAppsMutex.Unlock() - var appThrottle *base.AppThrottle now := time.Now() if object, found := throttler.throttledApps.Get(appName); found { appThrottle = object.(*base.AppThrottle) @@ -666,18 +665,20 @@ func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, rati if ratio < 0 { ratio = defaultThrottleRatio } - appThrottle = base.NewAppThrottle(expireAt, ratio) + appThrottle = base.NewAppThrottle(appName, expireAt, ratio) } if now.Before(appThrottle.ExpireAt) { throttler.throttledApps.Set(appName, appThrottle, cache.DefaultExpiration) } else { throttler.UnthrottleApp(appName) } + return appThrottle } // UnthrottleApp cancels any throttling, if any, for a given app -func (throttler *Throttler) UnthrottleApp(appName string) { +func (throttler *Throttler) UnthrottleApp(appName string) (appThrottle *base.AppThrottle) { throttler.throttledApps.Delete(appName) + return base.NewAppThrottle(appName, time.Now(), 0) } // IsAppThrottled tells whether some app should be throttled. diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 5ddc4ecaeaf..32f1ce6f64d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -23,6 +23,7 @@ import ( "errors" "net/http" "sync" + "time" "vitess.io/vitess/go/vt/servenv" @@ -35,11 +36,23 @@ import ( "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) +const ( + throttleCheckDuration = 250 * time.Millisecond + throttlerAppName = "vstreamer" +) + +var ( + throttleFlags = &throttle.CheckFlags{ + LowPriority: true, + } +) + // Engine is the engine for handling vreplication streaming requests. type Engine struct { env tabletenv.Env @@ -84,17 +97,21 @@ type Engine struct { errorCounts *stats.CountersWithSingleLabel vstreamersCreated *stats.Counter vstreamersEndedWithErrors *stats.Counter + + lagThrottler *throttle.Throttler + lastSuccessfulThrottleCheck time.Time } // NewEngine creates a new Engine. // Initialization sequence is: NewEngine->InitDBConfig->Open. // Open and Close can be called multiple times and are idempotent. -func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, cell string) *Engine { +func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrottler *throttle.Throttler, cell string) *Engine { vse := &Engine{ - env: env, - ts: ts, - se: se, - cell: cell, + env: env, + ts: ts, + se: se, + cell: cell, + lagThrottler: lagThrottler, streamers: make(map[int]*uvstreamer), rowStreamers: make(map[int]*rowStreamer), @@ -177,7 +194,39 @@ func (vse *Engine) vschema() *vindexes.VSchema { return vse.lvschema.vschema } +func (vse *Engine) throttleStatusOK(ctx context.Context, sleep bool) bool { + if vse.lagThrottler == nil { + // no throttler + return true + } + if time.Since(vse.lastSuccessfulThrottleCheck) <= throttleCheckDuration { + // if last check was OK just very recently there is no need to check again + return true + } + // It's time to run a throttler check + checkResult := vse.lagThrottler.CheckSelf(ctx, throttlerAppName, "", throttleFlags) + if checkResult.StatusCode != http.StatusOK { + // sorry, we got throttled. + if sleep { + time.Sleep(throttleCheckDuration) + } + return false + } + vse.lastSuccessfulThrottleCheck = time.Now() + return true +} + +// throttle will wait until the throttler's "check-self" check is satisfied +func (vse *Engine) throttle(ctx context.Context) { + // We introduce throttling based on the tablet's "self" check, which means if the tablet itself is lagging, + // we hold off reads so as to ease the load and let it regain its health + for !vse.throttleStatusOK(ctx, true) { + // Sorry, got throttled. Sleep some time, then check again + } +} + // Stream starts a new stream. +// This streams events from the binary logs func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { // Ensure vschema is initialized and the watcher is started. // Starting of the watcher has to be delayed till the first call to Stream @@ -217,6 +266,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl } // StreamRows streams rows. +// This streams the table data rows (so we can copy the table data snapshot) func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltypes.Value, send func(*binlogdatapb.VStreamRowsResponse) error) error { // Ensure vschema is initialized and the watcher is started. // Starting of the watcher has to be delayed till the first call to Stream @@ -231,6 +281,7 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } + rowStreamer := newRowStreamer(ctx, vse.env.Config().DB.AppWithDB(), vse.se, query, lastpk, vse.lvschema, send, vse) idx := vse.streamIdx vse.rowStreamers[idx] = rowStreamer diff --git a/go/vt/vttablet/tabletserver/vstreamer/main_test.go b/go/vt/vttablet/tabletserver/vstreamer/main_test.go index 6eb68dd5221..495c06a5d7b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/main_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/main_test.go @@ -53,7 +53,7 @@ func TestMain(m *testing.M) { // engine cannot be initialized in testenv because it introduces // circular dependencies - engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, env.Cells[0]) + engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) engine.InitDBConfig(env.KeyspaceName) engine.Open() defer engine.Close() @@ -70,7 +70,7 @@ func customEngine(t *testing.T, modifier func(mysql.ConnParams) mysql.ConnParams config := env.TabletEnv.Config().Clone() config.DB = dbconfigs.NewTestDBConfigs(modified, modified, modified.DbName) - engine := NewEngine(tabletenv.NewEnv(config, "VStreamerTest"), env.SrvTopo, env.SchemaEngine, env.Cells[0]) + engine := NewEngine(tabletenv.NewEnv(config, "VStreamerTest"), env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) engine.InitDBConfig(env.KeyspaceName) engine.Open() return engine diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index f1a4bdae192..d47539e759d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -97,6 +97,11 @@ func (rs *resultStreamer) Stream() error { default: } + // check throttler. If required throttling, sleep ("true" argument) and retry loop + if !rs.vse.throttleStatusOK(rs.ctx, true) { + continue + } + row, err := conn.FetchNext() if err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 4e8d921b823..8245b174c53 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -244,6 +244,11 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V default: } + // check throttler. If required throttling, sleep ("true" argument) and retry loop + if !rs.vse.throttleStatusOK(rs.ctx, true) { + continue + } + row, err := conn.FetchNext() if err != nil { return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 9d0d352e1ea..d2d4ecce43c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -177,6 +177,7 @@ func (vs *vstreamer) replicate(ctx context.Context) error { return wrapError(err, vs.pos, vs.vse) } defer conn.Close() + events, err := conn.StartBinlogDumpFromPosition(vs.ctx, vs.pos) if err != nil { return wrapError(err, vs.pos, vs.vse) @@ -268,6 +269,27 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // Main loop: calls bufferAndTransmit as events arrive. timer := time.NewTimer(HeartbeatTime) defer timer.Stop() + + // throttledEvents can be read just like you would read from events + // throttledEvents pulls data from events, but throttles pulling data, + // which in turn blocks the BinlogConnection from pushing events to the channel + throttledEvents := make(chan mysql.BinlogEvent) + go func() { + for { + // check throttler. If required throttling, sleep ("true" argument) and retry loop + if !vs.vse.throttleStatusOK(ctx, true) { + continue + } + + ev, ok := <-events + if ok { + throttledEvents <- ev + } else { + close(throttledEvents) + return + } + } + }() for { timer.Reset(HeartbeatTime) // Drain event if timer fired before reset. @@ -277,7 +299,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } select { - case ev, ok := <-events: + case ev, ok := <-throttledEvents: if !ok { select { case <-ctx.Done(): diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 4851b685b2f..97b561c64fc 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -251,7 +251,7 @@ func TestVersion(t *testing.T) { require.NoError(t, err) defer env.SchemaEngine.EnableHistorian(false) - engine = NewEngine(engine.env, env.SrvTopo, env.SchemaEngine, env.Cells[0]) + engine = NewEngine(engine.env, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) engine.InitDBConfig(env.KeyspaceName) engine.Open() defer engine.Close() diff --git a/test.go b/test.go index bf355e39a72..35c22b171b6 100755 --- a/test.go +++ b/test.go @@ -83,7 +83,7 @@ var ( pull = flag.Bool("pull", true, "re-pull the bootstrap image, in case it's been updated") docker = flag.Bool("docker", true, "run tests with Docker") useDockerCache = flag.Bool("use_docker_cache", false, "if true, create a temporary Docker image to cache the source code and the binaries generated by 'make build'. Used for execution on Travis CI.") - shard = flag.Int("shard", -1, "if N>=0, run the tests whose Shard field matches N") + shard = flag.String("shard", "", "if non-empty, run the tests whose Shard field matches value") tag = flag.String("tag", "", "if provided, only run tests with the given tag. Can't be combined with -shard or explicit test list") exclude = flag.String("exclude", "", "if provided, exclude tests containing any of the given tags (comma delimited)") keepData = flag.Bool("keep-data", false, "don't delete the per-test VTDATAROOT subfolders") @@ -123,7 +123,7 @@ type Test struct { Manual bool // Shard is used to split tests among workers. - Shard int + Shard string // RetryMax is the maximum number of times a test will be retried. // If 0, flag *retryMax is used. @@ -705,7 +705,7 @@ func getTestsSorted(names []string, testMap map[string]*Test) []*Test { func selectedTests(args []string, config *Config) []*Test { var tests []*Test excludedTests := strings.Split(*exclude, ",") - if *shard >= 0 { + if *shard != "" { // Run the tests in a given shard. // This can be combined with positional args. var names []string @@ -738,7 +738,7 @@ func selectedTests(args []string, config *Config) []*Test { tests = append(tests, t) } } - if len(args) == 0 && *shard < 0 { + if len(args) == 0 && *shard == "" { // Run all tests. var names []string for name, t := range config.Tests { diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index c32a51529b2..15adba877f9 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -32,7 +32,7 @@ const ( unitTestDatabases = "percona56, mysql57, mysql80, mariadb101, mariadb102, mariadb103" clusterTestTemplate = "templates/cluster_endtoend_test.tpl" - clusterList = "11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 27" + clusterList = "11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 26, 27, vreplication_basic" clustersRequiringMakeTools = "18,24" // TODO: currently some percona tools including xtrabackup are installed on all clusters, we can possibly optimize diff --git a/test/config.json b/test/config.json index 15b0fe79313..c740f048fca 100644 --- a/test/config.json +++ b/test/config.json @@ -8,7 +8,7 @@ "java_test" ], "Manual": false, - "Shard": 10, + "Shard": "10", "RetryMax": 0, "Tags": [] }, @@ -19,7 +19,7 @@ "test/client_test.sh" ], "Manual": false, - "Shard": 25, + "Shard": "25", "RetryMax": 0, "Tags": [] }, @@ -31,7 +31,7 @@ "e2e_test_race" ], "Manual": false, - "Shard": -1, + "Shard": "", "RetryMax": 0, "Tags": [] }, @@ -42,7 +42,7 @@ "tools/unit_test_runner.sh" ], "Manual": false, - "Shard": -1, + "Shard": "", "RetryMax": 0, "Tags": [] }, @@ -54,7 +54,7 @@ "unit_test_race" ], "Manual": false, - "Shard": 5, + "Shard": "5", "RetryMax": 0, "Tags": [] }, @@ -63,7 +63,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/vtctlbackup"], "Command": [], "Manual": false, - "Shard": 11, + "Shard": "11", "RetryMax": 0, "Tags": [] }, @@ -72,7 +72,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/mysqlctld"], "Command": [], "Manual": false, - "Shard": 21, + "Shard": "21", "RetryMax": 0, "Tags": [] }, @@ -81,7 +81,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/vtbackup"], "Command": [], "Manual": false, - "Shard": 19, + "Shard": "19", "RetryMax": 0, "Tags": [] }, @@ -90,7 +90,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/transform"], "Command": [], "Manual": false, - "Shard": 19, + "Shard": "19", "RetryMax": 0, "Tags": [] }, @@ -99,7 +99,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/transform/mysqlctld"], "Command": [], "Manual": false, - "Shard": 21, + "Shard": "21", "RetryMax": 0, "Tags": [] }, @@ -108,7 +108,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/xtrabackup"], "Command": [], "Manual": false, - "Shard": 20, + "Shard": "20", "RetryMax": 0, "Tags": [] }, @@ -117,7 +117,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/backup/xtrabackupstream"], "Command": [], "Manual": false, - "Shard": 20, + "Shard": "20", "RetryMax": 0, "Tags": [] }, @@ -126,7 +126,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/cellalias"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [] }, @@ -135,7 +135,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/preparestmt"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [] }, @@ -144,7 +144,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/mysqlserver"], "Command": [], "Manual": false, - "Shard": 24, + "Shard": "24", "RetryMax": 0, "Tags": [] }, @@ -153,7 +153,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/messaging"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [] }, @@ -162,7 +162,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/clustertest"], "Command": [], "Manual": false, - "Shard": 11, + "Shard": "11", "RetryMax": 0, "Tags": [] }, @@ -171,7 +171,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/encryption/encryptedreplication"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [] }, @@ -180,7 +180,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/encryption/encryptedtransport"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [] }, @@ -189,7 +189,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/initialsharding/v3"], "Command": [], "Manual": false, - "Shard": 13, + "Shard": "13", "RetryMax": 0, "Tags": [] }, @@ -198,7 +198,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/initialsharding/bytes"], "Command": [], "Manual": false, - "Shard": 13, + "Shard": "13", "RetryMax": 0, "Tags": [] }, @@ -207,7 +207,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/initialsharding/multi"], "Command": [], "Manual": false, - "Shard": -1, + "Shard": "", "RetryMax": 0, "Tags": [] }, @@ -216,7 +216,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/keyspace"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [ "site_test" @@ -227,7 +227,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/mergesharding/int"], "Command": [], "Manual": false, - "Shard": 22, + "Shard": "22", "RetryMax": 0, "Tags": [ "worker_test" @@ -238,7 +238,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/mergesharding/string"], "Command": [], "Manual": false, - "Shard": 22, + "Shard": "22", "RetryMax": 0, "Tags": [ "worker_test" @@ -249,7 +249,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/mysqlctl"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [ "site_test" @@ -260,7 +260,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/mysqlctld"], "Command": [], "Manual": false, - "Shard": 12, + "Shard": "12", "RetryMax": 0, "Tags": [ "site_test" @@ -271,7 +271,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/onlineddl"], "Command": [], "Manual": false, - "Shard": 26, + "Shard": "26", "RetryMax": 0, "Tags": [] }, @@ -280,7 +280,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/recovery/pitr"], "Command": [], "Manual": false, - "Shard": 10, + "Shard": "10", "RetryMax": 0, "Tags": [ "site_test" @@ -291,7 +291,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/recovery/pitrtls"], "Command": [], "Manual": false, - "Shard": 26, + "Shard": "26", "RetryMax": 0, "Tags": [ "site_test" @@ -302,7 +302,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/recovery/unshardedrecovery"], "Command": [], "Manual": false, - "Shard": 11, + "Shard": "11", "RetryMax": 0, "Tags": [] }, @@ -311,7 +311,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/reparent"], "Command": [], "Manual": false, - "Shard": 14, + "Shard": "14", "RetryMax": 0, "Tags": [] }, @@ -320,7 +320,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/resharding/v3"], "Command": [], "Manual": false, - "Shard": 15, + "Shard": "15", "RetryMax": 0, "Tags": [ "worker_test" @@ -331,7 +331,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/resharding/string"], "Command": [], "Manual": false, - "Shard": 15, + "Shard": "15", "RetryMax": 0, "Tags": [ "worker_test" @@ -342,7 +342,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharded"], "Command": [], "Manual": false, - "Shard": 11, + "Shard": "11", "RetryMax": 0, "Tags": [] }, @@ -351,7 +351,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/recovery/shardedrecovery"], "Command": [], "Manual": false, - "Shard": 16, + "Shard": "16", "RetryMax": 0, "Tags": [] }, @@ -360,7 +360,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletgateway/buffer"], "Command": [], "Manual": false, - "Shard": 13, + "Shard": "13", "RetryMax": 0, "Tags": [] }, @@ -369,7 +369,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletgateway/cellalias"], "Command": [], "Manual": false, - "Shard": 13, + "Shard": "13", "RetryMax": 0, "Tags": [] }, @@ -378,7 +378,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletgateway"], "Command": [], "Manual": false, - "Shard": 15, + "Shard": "15", "RetryMax": 0, "Tags": [] }, @@ -387,7 +387,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager"], "Command": [], "Manual": false, - "Shard": 18, + "Shard": "18", "RetryMax": 0, "Tags": [ "site_test" @@ -400,7 +400,7 @@ ], "Command": [], "Manual": false, - "Shard": 18, + "Shard": "18", "RetryMax": 0, "Tags": [ "site_test" @@ -411,7 +411,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager/throttler"], "Command": [], "Manual": false, - "Shard": 18, + "Shard": "18", "RetryMax": 0, "Tags": [ "site_test" @@ -422,7 +422,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/tabletmanager/tablegc"], "Command": [], "Manual": false, - "Shard": 27, + "Shard": "27", "RetryMax": 0, "Tags": [ "site_test" @@ -435,7 +435,7 @@ ], "Command": [], "Manual": false, - "Shard": 25, + "Shard": "25", "RetryMax": 0, "Tags": [ "site_test" @@ -446,7 +446,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/versionupgrade", "-keep-data", "-force-vtdataroot", "/tmp/vtdataroot/vtroot_10901", "-force-port-start", "11900", "-force-base-tablet-uid", "1190"], "Command": [], "Manual": false, - "Shard": 28, + "Shard": "28", "RetryMax": 0, "Tags": [] }, @@ -455,7 +455,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/sharding/verticalsplit"], "Command": [], "Manual": false, - "Shard": 16, + "Shard": "16", "RetryMax": 0, "Tags": [ "worker_test" @@ -466,7 +466,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -475,7 +475,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/buffer"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -484,7 +484,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/concurrentdml"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -493,7 +493,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/schema"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -502,7 +502,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/sequence"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -511,7 +511,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/reservedconn"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -520,7 +520,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -529,7 +529,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/unsharded"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -538,7 +538,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/vschema"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -547,7 +547,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/readafterwrite"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -556,7 +556,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtctldweb"], "Command": [], "Manual": false, - "Shard": 10, + "Shard": "10", "RetryMax": 0, "Tags": [] }, @@ -565,7 +565,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vtcombo"], "Command": [], "Manual": false, - "Shard": 25, + "Shard": "25", "RetryMax": 0, "Tags": [] }, @@ -574,7 +574,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/recovery/xtrabackup"], "Command": [], "Manual": false, - "Shard": 17, + "Shard": "17", "RetryMax": 0, "Tags": [] }, @@ -583,7 +583,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/worker"], "Command": [], "Manual": false, - "Shard": 23, + "Shard": "23", "RetryMax": 0, "Tags": [ "worker_test" @@ -594,7 +594,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "MultiCell"], "Command": [], "Manual": false, - "Shard": 22, + "Shard": "22", "RetryMax": 3, "Tags": [] }, @@ -603,7 +603,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "CellAlias"], "Command": [], "Manual": false, - "Shard": 23, + "Shard": "23", "RetryMax": 3, "Tags": [] }, @@ -612,7 +612,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow"], "Command": [], "Manual": false, - "Shard": 24, + "Shard": "vreplication_basic", "RetryMax": 3, "Tags": [] }, @@ -621,7 +621,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/orchestrator"], "Command": [], "Manual": false, - "Shard": 22, + "Shard": "22", "RetryMax": 0, "Tags": [] }, @@ -630,7 +630,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vault"], "Command": [], "Manual": false, - "Shard": 23, + "Shard": "23", "RetryMax": 0, "Tags": [] }, @@ -639,7 +639,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicV2Workflows"], "Command": [], "Manual": false, - "Shard": 21, + "Shard": "21", "RetryMax": 3, "Tags": [] }