From 78656be87952db9e82d3e91cf1e4dfec6ff41857 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 16 Aug 2023 12:47:06 +0300 Subject: [PATCH 01/28] Endtoend: stress tests for VTGate FOREIGN KEY support Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- ...ster_endtoend_vtgate_foreignkey_stress.yml | 139 +++ .../foreignkey/stress/fk_stress_test.go | 882 ++++++++++++++++++ test/ci_workflow_gen.go | 1 + test/config.json | 9 + 4 files changed, 1031 insertions(+) create mode 100644 .github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml create mode 100644 go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go diff --git a/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml b/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml new file mode 100644 index 00000000000..b382140cc2f --- /dev/null +++ b/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml @@ -0,0 +1,139 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (vtgate_foreignkey_stress) +on: [push, pull_request] +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vtgate_foreignkey_stress)') + cancel-in-progress: true + +permissions: read-all + +env: + LAUNCHABLE_ORGANIZATION: "vitess" + LAUNCHABLE_WORKSPACE: "vitess-app" + GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + +jobs: + build: + name: Run endtoend tests on Cluster (vtgate_foreignkey_stress) + runs-on: ubuntu-22.04 + + steps: + - name: Skip CI + run: | + if [[ "${{contains( github.event.pull_request.labels.*.name, 'Skip CI')}}" == "true" ]]; then + echo "skipping CI due to the 'Skip CI' label" + exit 1 + fi + + - name: Check if workflow needs to be skipped + id: skip-workflow + run: | + skip='false' + if [[ "${{github.event.pull_request}}" == "" ]] && [[ "${{github.ref}}" != "refs/heads/main" ]] && [[ ! "${{github.ref}}" =~ ^refs/heads/release-[0-9]+\.[0-9]$ ]] && [[ ! "${{github.ref}}" =~ "refs/tags/.*" ]]; then + skip='true' + fi + echo Skip ${skip} + echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT + + - name: Check out code + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: actions/checkout@v3 + + - name: Check for changes in relevant files + if: steps.skip-workflow.outputs.skip-workflow == 'false' + uses: frouioui/paths-filter@main + id: changes + with: + token: '' + filters: | + end_to_end: + - 'go/**/*.go' + - 'test.go' + - 'Makefile' + - 'build.env' + - 'go.sum' + - 'go.mod' + - 'proto/*.proto' + - 'tools/**' + - 'config/**' + - 'bootstrap.sh' + - '.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml' + + - name: Set up Go + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-go@v4 + with: + go-version: 1.20.5 + + - name: Set up python + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + uses: actions/setup-python@v4 + + - name: Tune the OS + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + # Limit local port range to not use ports that overlap with server side + # ports that we listen on. + sudo sysctl -w net.ipv4.ip_local_port_range="22768 65535" + # Increase the asynchronous non-blocking I/O. More information at https://dev.mysql.com/doc/refman/5.7/en/innodb-parameters.html#sysvar_innodb_use_native_aio + echo "fs.aio-max-nr = 1048576" | sudo tee -a /etc/sysctl.conf + sudo sysctl -p /etc/sysctl.conf + + - name: Get dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + run: | + + # Get key to latest MySQL repo + sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29 + # Setup MySQL 8.0 + wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.24-1_all.deb + echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections + sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config* + sudo apt-get update + # Install everything else we need, and configure + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils libncurses5 + + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + # install JUnit report formatter + go install github.com/vitessio/go-junit-report@HEAD + + - name: Setup launchable dependencies + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' + run: | + # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up + pip3 install --user launchable~=1.0 > /dev/null + + # verify that launchable setup is all correct. + launchable verify || true + + # Tell Launchable about the build you are producing and testing + launchable record build --name "$GITHUB_RUN_ID" --no-commit-collection --source . + + - name: Run cluster endtoend test + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' + timeout-minutes: 45 + run: | + # We set the VTDATAROOT to the /tmp folder to reduce the file path of mysql.sock file + # which musn't be more than 107 characters long. + export VTDATAROOT="/tmp/" + source build.env + + set -exo pipefail + + # run the tests however you normally do, then produce a JUnit XML file + eatmydata -- go run test.go -docker=false -follow -shard vtgate_foreignkey_stress | tee -a output.txt | go-junit-report -set-exit-code > report.xml + + - name: Print test output and Record test result in launchable + if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() + run: | + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + + # print test output + cat output.txt diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go new file mode 100644 index 00000000000..d34486397ba --- /dev/null +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -0,0 +1,882 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fkstress + +import ( + "context" + "flag" + "fmt" + "math/rand" + "os" + "path" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/sqlparser" +) + +// This endtoend test is designd to validate VTGate's FOREIGN KEY implementation for unsharded/single-sharded/shard-scope, meaning +// we expect foreign key constraints to be limited to a shard (related rows can never be on diffrent shards). +// +// This test validates NO ACTION, CASCADE and SET NULL reference actions. +// VTGate's support for foreign keys includes: +// - Analyzing the foreign key constraints in a keyspace. +// - Rejecting INSERT statements for child table when there's no matching row on a parent table. +// - Handling DELETE and UPDATE statements on a parent table according to the reference action on all children. +// Specifically, this means for example that VTGate will handle a ON DELETE CASCADE in Vitess plane. It will first delete rows +// from the child (recursive operation) before deleting the row on the parent. As result, the underlying MySQL server will have +// nothing to cascade. +// +// The design of this test is as follows: +// - Create a cluster with PRIMARY and REPLICA tablets +// - Given this structure of tables with foreign key constraints: +// stress_parent +// +- stress_child +// +- stress_grandchild +// +- stress_child2 +// - Create these tables. Then, on the MySQL replica, remove the foreign key constraints. +// - Static test: +// - Randomly populate all tables via highly-contentive INSERT/UPDATE/DELETE statements +// - Validate collected metrics match actual table data +// - Validate foreign key constraints integrity +// - Workload test: +// - Initially populate tables as above +// - Run a high contention workload where multiple connections issue random INSERT/UPDATE/DELETE on all related tables +// - Validate collected metrics match actual table data +// - Validate foreign key constraints integrity on MySQL primary +// - Validate foreign key constraints integrity on MySQL replica +// - Compare data on primary & replica +// +// We of course know that foreign key integrity is maintained on the MySQL primary. However, the replica does not have the matching +// constraints. Since cascaded (SET NULL, CASCADE) writes are handled internally by InnoDB and not written to the binary log, +// any cascaded writes on the primary are lost, and the replica is unaware of those writes. Without VTGate intervention, we expect +// the replica to quickly diverge from the primary, and in fact in all likelyhood replication will break very quickly. +// However, if VTGate implements the cascading rules correctly, the primary MySQL server will never have any actual cascades, and +// so cascaded writes are all accounted for in the binary logs, which means we can expect the replica to be compliant with the +// primary. + +type WriteMetrics struct { + mu sync.Mutex + insertsAttempts, insertsFailures, insertsNoops, inserts int64 + updatesAttempts, updatesFailures, updatesNoops, updates int64 + deletesAttempts, deletesFailures, deletesNoops, deletes int64 +} + +func (w *WriteMetrics) Clear() { + w.mu.Lock() + defer w.mu.Unlock() + + w.inserts = 0 + w.updates = 0 + w.deletes = 0 + + w.insertsAttempts = 0 + w.insertsFailures = 0 + w.insertsNoops = 0 + + w.updatesAttempts = 0 + w.updatesFailures = 0 + w.updatesNoops = 0 + + w.deletesAttempts = 0 + w.deletesFailures = 0 + w.deletesNoops = 0 +} + +func (w *WriteMetrics) String() string { + return fmt.Sprintf(`WriteMetrics: inserts-deletes=%d, updates-deletes=%d, +insertsAttempts=%d, insertsFailures=%d, insertsNoops=%d, inserts=%d, +updatesAttempts=%d, updatesFailures=%d, updatesNoops=%d, updates=%d, +deletesAttempts=%d, deletesFailures=%d, deletesNoops=%d, deletes=%d, +`, + w.inserts-w.deletes, w.updates-w.deletes, + w.insertsAttempts, w.insertsFailures, w.insertsNoops, w.inserts, + w.updatesAttempts, w.updatesFailures, w.updatesNoops, w.updates, + w.deletesAttempts, w.deletesFailures, w.deletesNoops, w.deletes, + ) +} + +var ( + clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard + primary *cluster.Vttablet + replica *cluster.Vttablet + vtParams mysql.ConnParams + + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + schemaChangeDirectory = "" + parentTableName = "stress_parent" + childTableName = "stress_child" + child2TableName = "stress_child2" + grandchildTableName = "stress_grandchild" + tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName} + reverseTableNames []string + + onDeleteActionsMap = map[sqlparser.ReferenceAction]string{ + sqlparser.NoAction: "NO ACTION", + sqlparser.Cascade: "CASCADE", + sqlparser.SetNull: "SET NULL", + } + onDeleteActions = []sqlparser.ReferenceAction{sqlparser.NoAction, sqlparser.Cascade, sqlparser.SetNull} + createStatements = []string{ + ` + CREATE TABLE stress_parent ( + id bigint not null, + parent_id bigint, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default '', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key parent_id_idx(parent_id), + key created_idx(created_timestamp), + key updates_idx(updates) + ) ENGINE=InnoDB + `, + ` + CREATE TABLE stress_child ( + id bigint not null, + parent_id bigint, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default '', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key parent_id_idx(parent_id), + key created_idx(created_timestamp), + key updates_idx(updates), + CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFERENCES stress_parent (id) ON DELETE %s + ) ENGINE=InnoDB + `, + ` + CREATE TABLE stress_child2 ( + id bigint not null, + parent_id bigint, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default '', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key parent_id_idx(parent_id), + key created_idx(created_timestamp), + key updates_idx(updates), + CONSTRAINT child2_parent_fk FOREIGN KEY (parent_id) REFERENCES stress_parent (id) ON DELETE %s + ) ENGINE=InnoDB + `, + ` + CREATE TABLE stress_grandchild ( + id bigint not null, + parent_id bigint, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default '', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key parent_id_idx(parent_id), + key created_idx(created_timestamp), + key updates_idx(updates), + CONSTRAINT grandchild_child_fk FOREIGN KEY (parent_id) REFERENCES stress_child (id) ON DELETE %s + ) ENGINE=InnoDB + `, + } + dropConstraintsStatements = []string{ + `ALTER TABLE stress_child DROP CONSTRAINT child_parent_fk`, + `ALTER TABLE stress_child2 DROP CONSTRAINT child2_parent_fk`, + `ALTER TABLE stress_grandchild DROP CONSTRAINT grandchild_child_fk`, + } + insertRowStatement = ` + INSERT IGNORE INTO %s (id, parent_id, rand_val) VALUES (%d, %d, left(md5(rand()), 8)) + ` + updateRowStatement = ` + UPDATE %s SET rand_val=left(md5(rand()), 8), updates=updates+1 WHERE id=%d + ` + deleteRowStatement = ` + DELETE FROM %s WHERE id=%d AND updates=1 + ` + // We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type + selectCountRowsStatement = ` + SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM %s + ` + selectMatchingRowsChild = ` + select stress_child.id from stress_child join stress_parent on (stress_parent.id = stress_child.parent_id) + ` + selectMatchingRowsChild2 = ` + select stress_child2.id from stress_child2 join stress_parent on (stress_parent.id = stress_child2.parent_id) + ` + selectMatchingRowsGrandchild = ` + select stress_grandchild.id from stress_grandchild join stress_child on (stress_child.id = stress_grandchild.parent_id) + ` + selectOrphanedRowsChild = ` + select stress_child.id from stress_child left join stress_parent on (stress_parent.id = stress_child.parent_id) where stress_parent.id is null + ` + selectOrphanedRowsChild2 = ` + select stress_child2.id from stress_child2 left join stress_parent on (stress_parent.id = stress_child2.parent_id) where stress_parent.id is null + ` + selectOrphanedRowsGrandchild = ` + select stress_grandchild.id from stress_grandchild left join stress_child on (stress_child.id = stress_grandchild.parent_id) where stress_child.id is null + ` + deleteAllStatement = ` + DELETE FROM %s + ` + writeMetrics = map[string]*WriteMetrics{} +) + +const ( + maxTableRows = 4096 +) + +// The following variables are fit for a local, strong developer box. +// The test overrides these into more relaxed values if running on GITHUB_ACTIONS, +// seeing that GitHub CI is much weaker. +var ( + maxConcurrency = 10 + singleConnectionSleepInterval = 10 * time.Millisecond + countIterations = 3 +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode, err := func() (int, error) { + clusterInstance = cluster.NewCluster(cell, hostname) + schemaChangeDirectory = path.Join("/tmp", fmt.Sprintf("schema_change_dir_%d", clusterInstance.GetAndReserveTabletUID())) + defer os.RemoveAll(schemaChangeDirectory) + defer clusterInstance.Teardown() + + if _, err := os.Stat(schemaChangeDirectory); os.IsNotExist(err) { + _ = os.Mkdir(schemaChangeDirectory, 0700) + } + + clusterInstance.VtctldExtraArgs = []string{ + "--schema_change_dir", schemaChangeDirectory, + "--schema_change_controller", "local", + "--schema_change_check_interval", "1s", + } + + clusterInstance.VtTabletExtraArgs = []string{ + "--heartbeat_enable", + "--heartbeat_interval", "250ms", + "--heartbeat_on_demand_duration", "5s", + "--watch_replication_stream", + } + clusterInstance.VtGateExtraArgs = []string{} + + if err := clusterInstance.StartTopo(); err != nil { + return 1, err + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + } + + // We will use a replica to confirm that vtgate's cascading works correctly. + if err := clusterInstance.StartKeyspace(*keyspace, []string{"1"}, 1, false); err != nil { + return 1, err + } + + vtgateInstance := clusterInstance.NewVtgateInstance() + // Start vtgate + if err := vtgateInstance.Setup(); err != nil { + return 1, err + } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + + return m.Run(), nil + }() + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } + +} + +func queryTablet(t *testing.T, tablet *cluster.Vttablet, query string, expectError string) *sqltypes.Result { + rs, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + if expectError == "" { + assert.NoError(t, err) + } else { + assert.ErrorContains(t, err, expectError) + } + return rs +} + +func tabletTestName(t *testing.T, tablet *cluster.Vttablet) string { + switch tablet { + case primary: + return "primary" + case replica: + return "replica" + default: + assert.FailNowf(t, "unknown tablet", "%v, type=%v", tablet.Alias, tablet.Type) + } + return "" +} + +func validateReplicationIsHealthy(t *testing.T, tablet *cluster.Vttablet) bool { + query := "show replica status" + rs, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + assert.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + + ioRunning := row.AsString("Replica_IO_Running", "") + require.NotEmpty(t, ioRunning) + ioHealthy := assert.Equalf(t, "Yes", ioRunning, "row: %v", row) + sqlRunning := row.AsString("Replica_SQL_Running", "") + require.NotEmpty(t, sqlRunning) + sqlHealthy := assert.Equalf(t, "Yes", sqlRunning, "row: %v", row) + + return ioHealthy && sqlHealthy +} + +func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Position { + rs := queryTablet(t, tablet, "select @@gtid_executed as gtid_executed", "") + row := rs.Named().Row() + require.NotNil(t, row) + gtidExecuted := row.AsString("gtid_executed", "") + require.NotEmpty(t, gtidExecuted) + pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID) + assert.NoError(t, err) + return pos +} + +func waitForReplicaCatchup(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + primaryPos := getTabletPosition(t, primary) + for { + replicaPos := getTabletPosition(t, replica) + if replicaPos.GTIDSet.Contains(primaryPos.GTIDSet) { + // success + return + } + if !validateReplicationIsHealthy(t, replica) { + assert.FailNow(t, "replica is unhealthy; not waiting for catchup") + return + } + select { + case <-ctx.Done(): + assert.FailNow(t, "timeout waiting for replica to catch up") + return + case <-time.After(time.Second): + // + } + } +} + +func TestStressFK(t *testing.T) { + defer cluster.PanicHandler(t) + + if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { + // This is the place to fine tune the stress parameters if GitHub actions are too slow + maxConcurrency = maxConcurrency * 1 + singleConnectionSleepInterval = singleConnectionSleepInterval * 1 + } + t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) + + shards = clusterInstance.Keyspaces[0].Shards + require.Equal(t, 1, len(shards)) + require.Equal(t, 2, len(shards[0].Vttablets)) + primary = shards[0].Vttablets[0] + require.NotNil(t, primary) + replica = shards[0].Vttablets[1] + require.NotNil(t, replica) + require.NotEqual(t, primary.Alias, replica.Alias) + + tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName} + reverseTableNames = slices.Clone(tableNames) + slices.Reverse(reverseTableNames) + require.ElementsMatch(t, tableNames, reverseTableNames) + + for _, tableName := range tableNames { + writeMetrics[tableName] = &WriteMetrics{} + } + + t.Run("validate replication health", func(t *testing.T) { + validateReplicationIsHealthy(t, replica) + }) + + validateMetrics := func(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { + for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { + tname := fmt.Sprintf("validate metrics: %s", workloadTable) + t.Run(tname, func(t *testing.T) { + var primaryRows, replicaRows int64 + t.Run(tabletTestName(t, primary), func(t *testing.T) { + primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction) + }) + t.Run(tabletTestName(t, replica), func(t *testing.T) { + replicaRows = testSelectTableMetrics(t, replica, workloadTable, onDeleteAction) + }) + t.Run("compare primary and replica", func(t *testing.T) { + assert.Equal(t, primaryRows, replicaRows) + }) + }) + } + } + + t.Run("static data", func(t *testing.T) { + for _, onDeleteAction := range onDeleteActions { + t.Run(onDeleteActionsMap[onDeleteAction], func(t *testing.T) { + t.Run("create schema", func(t *testing.T) { + createInitialSchema(t, onDeleteAction) + }) + t.Run("init tables", func(t *testing.T) { + populateTables(t) + }) + t.Run("wait for replica", func(t *testing.T) { + waitForReplicaCatchup(t) + }) + t.Run("validate metrics", func(t *testing.T) { + validateMetrics(t, onDeleteAction) + }) + t.Run("validate replication health", func(t *testing.T) { + validateReplicationIsHealthy(t, replica) + }) + }) + } + }) + t.Run("stress", func(t *testing.T) { + for _, onDeleteAction := range onDeleteActions { + t.Run(onDeleteActionsMap[onDeleteAction], func(t *testing.T) { + // This tests running a workload on the table, then comparing expected metrics with + // actual table metrics. All this without any ALTER TABLE: this is to validate + // that our testing/metrics logic is sound in the first place. + t.Run("Workload", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Run("create schema", func(t *testing.T) { + createInitialSchema(t, onDeleteAction) + }) + t.Run("init tables", func(t *testing.T) { + populateTables(t) + }) + t.Run("workload", func(t *testing.T) { + var wg sync.WaitGroup + for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { + wg.Add(1) + go func(tbl string) { + defer wg.Done() + runMultipleConnections(ctx, t, tbl) + }(workloadTable) + } + time.Sleep(5 * time.Second) + cancel() // will cause runMultipleConnections() to terminate + wg.Wait() + }) + t.Run("wait for replica", func(t *testing.T) { + waitForReplicaCatchup(t) + }) + validateMetrics(t, onDeleteAction) + t.Run("validate replication health", func(t *testing.T) { + validateReplicationIsHealthy(t, replica) + }) + t.Run("validate fk", func(t *testing.T) { + testFKIntegrity(t, primary, onDeleteAction) + testFKIntegrity(t, replica, onDeleteAction) + }) + }) + }) + } + }) +} + +// createInitialSchema creates the tables from scratch, and drops the foreign key constraints on the replica. +func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + t.Run("dropping tables", func(t *testing.T) { + for _, tableName := range reverseTableNames { + err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, "drop table if exists "+tableName) + require.NoError(t, err) + } + }) + t.Run("creating tables", func(t *testing.T) { + // Create the stress tables + var b strings.Builder + for _, sql := range createStatements { + b.WriteString(fmt.Sprintf(sql, onDeleteActionsMap[onDeleteAction])) + b.WriteString(";") + } + err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, b.String()) + require.NoError(t, err) + + rs, err := conn.ExecuteFetch("show full tables", 1000, true) + require.NoError(t, err) + require.Equal(t, 4, len(rs.Rows)) + t.Logf("===== init: %d tables created", len(rs.Rows)) + }) + t.Run("wait for replica", func(t *testing.T) { + waitForReplicaCatchup(t) + }) + t.Run("validating tables: vttablet", func(t *testing.T) { + // Check if table is created. Checked on tablets. + checkTable(t, parentTableName) + checkTable(t, childTableName) + checkTable(t, child2TableName) + checkTable(t, grandchildTableName) + }) + t.Run("validating tables: vtgate", func(t *testing.T) { + // Wait for tables to appear on VTGate + waitForTable(t, parentTableName, conn) + waitForTable(t, childTableName, conn) + waitForTable(t, child2TableName, conn) + waitForTable(t, grandchildTableName, conn) + }) + t.Run("dropping foreign keys on replica", func(t *testing.T) { + for _, statement := range dropConstraintsStatements { + _ = queryTablet(t, replica, "set global super_read_only=0", "") + _ = queryTablet(t, replica, statement, "") + _ = queryTablet(t, replica, "set global super_read_only=1", "") + } + }) + t.Run("validate definitions", func(t *testing.T) { + for _, tableName := range []string{childTableName, child2TableName, grandchildTableName} { + t.Run(tableName, func(t *testing.T) { + t.Run(tabletTestName(t, primary), func(t *testing.T) { + stmt := getCreateTableStatement(t, primary, tableName) + assert.Contains(t, stmt, "CONSTRAINT") + }) + t.Run(tabletTestName(t, replica), func(t *testing.T) { + stmt := getCreateTableStatement(t, replica, tableName) + assert.NotContains(t, stmt, "CONSTRAINT") + }) + }) + } + }) +} + +// waitForTable waits until table is seen in VTGate +func waitForTable(t *testing.T, tableName string, conn *mysql.Conn) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + query := fmt.Sprintf("select count(*) from %s", tableName) + for { + if _, err := conn.ExecuteFetch(query, 1, false); err == nil { + return // good + } + select { + case <-ticker.C: + case <-ctx.Done(): + t.Fail() + return + } + } +} + +// checkTable checks that the given table exists on all tablets +func checkTable(t *testing.T, showTableName string) { + for _, tablet := range shards[0].Vttablets { + checkTablesCount(t, tablet, showTableName, 1) + } +} + +// checkTablesCount checks the number of tables in the given tablet +func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) { + query := fmt.Sprintf(`show tables like '%s';`, showTableName) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + rowcount := 0 + for { + queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.Nil(t, err) + rowcount = len(queryResult.Rows) + if rowcount > 0 { + break + } + + select { + case <-time.After(time.Second): + case <-ctx.Done(): + break + } + } + assert.Equal(t, expectCount, rowcount) +} + +// getCreateTableStatement returns the CREATE TABLE statement for a given table +func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName string) (statement string) { + queryResult := queryTablet(t, tablet, fmt.Sprintf("show create table %s", tableName), "") + + require.Equal(t, len(queryResult.Rows), 1) + row := queryResult.Rows[0] + assert.Equal(t, len(row), 2) // table name, create statement + statement = row[1].ToString() + return statement +} + +func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + parentId := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(insertRowStatement, tableName, id, parentId) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics[tableName].mu.Lock() + defer writeMetrics[tableName].mu.Unlock() + + writeMetrics[tableName].insertsAttempts++ + if err != nil { + writeMetrics[tableName].insertsFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics[tableName].insertsNoops++ + return + } + writeMetrics[tableName].inserts++ + }() + return err +} + +func generateUpdate(t *testing.T, tableName string, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(updateRowStatement, tableName, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics[tableName].mu.Lock() + defer writeMetrics[tableName].mu.Unlock() + + writeMetrics[tableName].updatesAttempts++ + if err != nil { + writeMetrics[tableName].updatesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics[tableName].updatesNoops++ + return + } + writeMetrics[tableName].updates++ + }() + return err +} + +func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(deleteRowStatement, tableName, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics[tableName].mu.Lock() + defer writeMetrics[tableName].mu.Unlock() + + writeMetrics[tableName].deletesAttempts++ + if err != nil { + writeMetrics[tableName].deletesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics[tableName].deletesNoops++ + return + } + writeMetrics[tableName].deletes++ + }() + return err +} + +func runSingleConnection(ctx context.Context, t *testing.T, tableName string, done *int64) { + log.Infof("Running single connection on %s", tableName) + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("set autocommit=1", 1000, true) + require.Nil(t, err) + _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) + require.Nil(t, err) + + for { + if atomic.LoadInt64(done) == 1 { + log.Infof("Terminating single connection") + return + } + switch rand.Int31n(3) { + case 0: + _ = generateInsert(t, tableName, conn) + case 1: + _ = generateUpdate(t, tableName, conn) + case 2: + _ = generateDelete(t, tableName, conn) + } + time.Sleep(singleConnectionSleepInterval) + } +} + +func runMultipleConnections(ctx context.Context, t *testing.T, tableName string) { + log.Infof("Running multiple connections") + var done int64 + var wg sync.WaitGroup + for i := 0; i < maxConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + runSingleConnection(ctx, t, tableName, &done) + }() + } + <-ctx.Done() + atomic.StoreInt64(&done, 1) + log.Infof("Running multiple connections: done") + wg.Wait() + log.Infof("All connections cancelled") +} + +func wrapWithNoFKChecks(sql string) string { + return fmt.Sprintf("set foreign_key_checks=0; %s; set foreign_key_checks=1;", sql) +} + +// populateTables randomly populates all test tables. This is done sequentially. +func populateTables(t *testing.T) { + log.Infof("initTable begin") + defer log.Infof("initTable complete") + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + t.Logf("===== clearing tables") + for _, tableName := range reverseTableNames { + writeMetrics[tableName].Clear() + deleteQuery := fmt.Sprintf(deleteAllStatement, tableName) + _, err = conn.ExecuteFetch(deleteQuery, 1000, true) + require.Nil(t, err) + } + t.Logf("===== populating tables") + for _, tableName := range tableNames { + // populate parent, then child, child2, then grandchild + for i := 0; i < maxTableRows/2; i++ { + generateInsert(t, tableName, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateUpdate(t, tableName, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateDelete(t, tableName, conn) + } + } +} + +// testSelectTableMetrics cross references the known metrics (number of successful insert/delete/updates) on each table, with the +// actual number of rows and with the row values on those tables. +// With CASCADE/SET NULL rules we can't do the comparison, because child tables are implicitly affected by the cascading rules, +// and the values do not match what reported to us when we UPDATE/DELETE on the parent tables. +func testSelectTableMetrics(t *testing.T, tablet *cluster.Vttablet, tableName string, onDeleteAction sqlparser.ReferenceAction) int64 { + switch onDeleteAction { + case sqlparser.Cascade, sqlparser.SetNull: + if tableName != parentTableName { + // We can't validate those tables because they will have been affected by cascading rules. + return 0 + } + } + writeMetrics[tableName].mu.Lock() + defer writeMetrics[tableName].mu.Unlock() + + log.Infof("%s %s", tableName, writeMetrics[tableName].String()) + + rs := queryTablet(t, tablet, fmt.Sprintf(selectCountRowsStatement, tableName), "") + + row := rs.Named().Row() + require.NotNil(t, row) + log.Infof("testSelectTableMetrics, row: %v", row) + numRows := row.AsInt64("num_rows", 0) + sumUpdates := row.AsInt64("sum_updates", 0) + assert.NotZero(t, numRows) + assert.NotZero(t, sumUpdates) + assert.NotZero(t, writeMetrics[tableName].inserts) + assert.NotZero(t, writeMetrics[tableName].deletes) + assert.NotZero(t, writeMetrics[tableName].updates) + assert.Equal(t, writeMetrics[tableName].inserts-writeMetrics[tableName].deletes, numRows) + assert.Equal(t, writeMetrics[tableName].updates-writeMetrics[tableName].deletes, sumUpdates) // because we DELETE WHERE updates=1 + + return numRows +} + +// testFKIntegrity validates that foreign key consitency is maintained on the given tablet. We cross reference all +// parent-child relationships. +// There are two test types: +// 1. Do a JOIN on parent-child associated rows, expect non-empty +// 2. Check that there are no orphaned child rows. Notes: +// - This applies to NO ACTION and CASCADE, but not to SET NULL, because SET NULL by design creates orphaned rows. +// - On the primary database, this test trivially passes because of course MySQL maintains this integrity. But remember +// that we remove the foreign key constraints on the replica. Also remember that cascaded writes are not written to +// the binary log. And so, if VTGate does not do a proper job, then a parent and child will drift apart in CASCADE writes. +func testFKIntegrity(t *testing.T, tablet *cluster.Vttablet, onDeleteAction sqlparser.ReferenceAction) { + testName := tabletTestName(t, tablet) + t.Run(testName, func(t *testing.T) { + t.Run("matching parent-child rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectMatchingRowsChild, "") + assert.NotZero(t, len(rs.Rows)) + t.Logf("===== matching rows: %v", len(rs.Rows)) + }) + t.Run("matching parent-child2 rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectMatchingRowsChild2, "") + assert.NotZero(t, len(rs.Rows)) + t.Logf("===== matching rows: %v", len(rs.Rows)) + }) + t.Run("matching child-grandchild rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectMatchingRowsGrandchild, "") + assert.NotZero(t, len(rs.Rows)) + t.Logf("===== matching rows: %v", len(rs.Rows)) + }) + if onDeleteAction != sqlparser.SetNull { + // Because with SET NULL there _are_ orphaned rows + t.Run("parent-child orphaned rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectOrphanedRowsChild, "") + assert.Zero(t, len(rs.Rows)) + }) + t.Run("parent-child2 orphaned rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectOrphanedRowsChild2, "") + assert.Zero(t, len(rs.Rows)) + }) + t.Run("child-grandchild orphaned rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectOrphanedRowsGrandchild, "") + assert.Zero(t, len(rs.Rows)) + }) + } + }) +} diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index f61c9af472f..d90c693c7cd 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -111,6 +111,7 @@ var ( "vtgate_vschema", "vtgate_queries", "vtgate_schema_tracker", + "vtgate_foreignkey_stress", "vtorc", "xb_recovery", "mysql80", diff --git a/test/config.json b/test/config.json index b46152462c4..68e971a2897 100644 --- a/test/config.json +++ b/test/config.json @@ -851,6 +851,15 @@ "RetryMax": 2, "Tags": [] }, + "vtgate_foreignkey_stress": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/foreignkey/stress"], + "Command": [], + "Manual": false, + "Shard": "vtgate_foreignkey_stress", + "RetryMax": 1, + "Tags": [] + }, "vtgate_gen4": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/gen4"], From ea1120ccb6c1ec09441d2459ef04941363ecada8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 16 Aug 2023 15:12:27 +0300 Subject: [PATCH 02/28] better broken replication message Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index d34486397ba..df1784420dd 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -358,10 +358,10 @@ func validateReplicationIsHealthy(t *testing.T, tablet *cluster.Vttablet) bool { ioRunning := row.AsString("Replica_IO_Running", "") require.NotEmpty(t, ioRunning) - ioHealthy := assert.Equalf(t, "Yes", ioRunning, "row: %v", row) + ioHealthy := assert.Equalf(t, "Yes", ioRunning, "Replication is broken. Replication status: %v", row) sqlRunning := row.AsString("Replica_SQL_Running", "") require.NotEmpty(t, sqlRunning) - sqlHealthy := assert.Equalf(t, "Yes", sqlRunning, "row: %v", row) + sqlHealthy := assert.Equalf(t, "Yes", sqlRunning, "Replication is broken. Replication status: %v", row) return ioHealthy && sqlHealthy } @@ -388,7 +388,7 @@ func waitForReplicaCatchup(t *testing.T) { return } if !validateReplicationIsHealthy(t, replica) { - assert.FailNow(t, "replica is unhealthy; not waiting for catchup") + assert.FailNow(t, "replication is broken; not waiting for catchup") return } select { From 8c7e05614956c9b759df4c17ab147e93cda482dc Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:31:54 +0300 Subject: [PATCH 03/28] generalize reference actions Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index df1784420dd..0b54857e135 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -140,12 +140,12 @@ var ( tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName} reverseTableNames []string - onDeleteActionsMap = map[sqlparser.ReferenceAction]string{ + referenceActionMap = map[sqlparser.ReferenceAction]string{ sqlparser.NoAction: "NO ACTION", sqlparser.Cascade: "CASCADE", sqlparser.SetNull: "SET NULL", } - onDeleteActions = []sqlparser.ReferenceAction{sqlparser.NoAction, sqlparser.Cascade, sqlparser.SetNull} + referenceActions = []sqlparser.ReferenceAction{sqlparser.NoAction, sqlparser.Cascade, sqlparser.SetNull} createStatements = []string{ ` CREATE TABLE stress_parent ( @@ -218,6 +218,9 @@ var ( updateRowStatement = ` UPDATE %s SET rand_val=left(md5(rand()), 8), updates=updates+1 WHERE id=%d ` + updateRowIdStatement = ` + UPDATE %s SET id=%v, rand_val=left(md5(rand()), 8), updates=updates+1 WHERE id=%d + ` deleteRowStatement = ` DELETE FROM %s WHERE id=%d AND updates=1 ` @@ -452,8 +455,8 @@ func TestStressFK(t *testing.T) { } t.Run("static data", func(t *testing.T) { - for _, onDeleteAction := range onDeleteActions { - t.Run(onDeleteActionsMap[onDeleteAction], func(t *testing.T) { + for _, onDeleteAction := range referenceActions { + t.Run(referenceActionMap[onDeleteAction], func(t *testing.T) { t.Run("create schema", func(t *testing.T) { createInitialSchema(t, onDeleteAction) }) @@ -473,8 +476,8 @@ func TestStressFK(t *testing.T) { } }) t.Run("stress", func(t *testing.T) { - for _, onDeleteAction := range onDeleteActions { - t.Run(onDeleteActionsMap[onDeleteAction], func(t *testing.T) { + for _, onDeleteAction := range referenceActions { + t.Run(referenceActionMap[onDeleteAction], func(t *testing.T) { // This tests running a workload on the table, then comparing expected metrics with // actual table metrics. All this without any ALTER TABLE: this is to validate // that our testing/metrics logic is sound in the first place. @@ -533,7 +536,7 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction) // Create the stress tables var b strings.Builder for _, sql := range createStatements { - b.WriteString(fmt.Sprintf(sql, onDeleteActionsMap[onDeleteAction])) + b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction])) b.WriteString(";") } err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, b.String()) From a80a8735c7862748778197d62720bb302ece89c8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:32:17 +0300 Subject: [PATCH 04/28] Refactor validateMetrics() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 0b54857e135..8fb9673c8b3 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -404,6 +404,24 @@ func waitForReplicaCatchup(t *testing.T) { } } +func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { + for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { + tname := fmt.Sprintf("validate metrics: %s", workloadTable) + t.Run(tname, func(t *testing.T) { + var primaryRows, replicaRows int64 + t.Run(tabletTestName(t, primary), func(t *testing.T) { + primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction) + }) + t.Run(tabletTestName(t, replica), func(t *testing.T) { + replicaRows = testSelectTableMetrics(t, replica, workloadTable, onDeleteAction) + }) + t.Run("compare primary and replica", func(t *testing.T) { + assert.Equal(t, primaryRows, replicaRows) + }) + }) + } +} + func TestStressFK(t *testing.T) { defer cluster.PanicHandler(t) @@ -436,24 +454,6 @@ func TestStressFK(t *testing.T) { validateReplicationIsHealthy(t, replica) }) - validateMetrics := func(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { - for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { - tname := fmt.Sprintf("validate metrics: %s", workloadTable) - t.Run(tname, func(t *testing.T) { - var primaryRows, replicaRows int64 - t.Run(tabletTestName(t, primary), func(t *testing.T) { - primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction) - }) - t.Run(tabletTestName(t, replica), func(t *testing.T) { - replicaRows = testSelectTableMetrics(t, replica, workloadTable, onDeleteAction) - }) - t.Run("compare primary and replica", func(t *testing.T) { - assert.Equal(t, primaryRows, replicaRows) - }) - }) - } - } - t.Run("static data", func(t *testing.T) { for _, onDeleteAction := range referenceActions { t.Run(referenceActionMap[onDeleteAction], func(t *testing.T) { From 235eb1cf7ddc7aa4cecf996dc3eaefdd037c44cc Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:34:11 +0300 Subject: [PATCH 05/28] refactored TestInitialSetup() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 8fb9673c8b3..c655d577261 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -422,16 +422,7 @@ func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { } } -func TestStressFK(t *testing.T) { - defer cluster.PanicHandler(t) - - if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { - // This is the place to fine tune the stress parameters if GitHub actions are too slow - maxConcurrency = maxConcurrency * 1 - singleConnectionSleepInterval = singleConnectionSleepInterval * 1 - } - t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) - +func TestInitialSetup(t *testing.T) { shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) require.Equal(t, 2, len(shards[0].Vttablets)) @@ -449,6 +440,17 @@ func TestStressFK(t *testing.T) { for _, tableName := range tableNames { writeMetrics[tableName] = &WriteMetrics{} } +} + +func TestStressFK(t *testing.T) { + defer cluster.PanicHandler(t) + + if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { + // This is the place to fine tune the stress parameters if GitHub actions are too slow + maxConcurrency = maxConcurrency * 1 + singleConnectionSleepInterval = singleConnectionSleepInterval * 1 + } + t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) t.Run("validate replication health", func(t *testing.T) { validateReplicationIsHealthy(t, replica) From 928d2e25d192114f2d952c546e9c99bf9626de14 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:52:25 +0300 Subject: [PATCH 06/28] refactored executeFKTest() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 131 ++++++++++-------- 1 file changed, 70 insertions(+), 61 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index c655d577261..1cc6340675e 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -442,6 +442,61 @@ func TestInitialSetup(t *testing.T) { } } +type testCase struct { + onDeleteAction sqlparser.ReferenceAction + onUpdateAction sqlparser.ReferenceAction + workload bool + onlineDDLTable string +} + +func executeFKTest(t *testing.T, tcase *testCase) { + workloadName := "static data" + if tcase.workload { + workloadName = "workload" + } + testName := fmt.Sprintf("%s/del=%s/upd=%s", workloadName, referenceActionMap[tcase.onDeleteAction], referenceActionMap[tcase.onUpdateAction]) + t.Run(testName, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("create schema", func(t *testing.T) { + createInitialSchema(t, tcase.onDeleteAction) + }) + t.Run("init tables", func(t *testing.T) { + populateTables(t) + }) + if tcase.workload { + t.Run("workload", func(t *testing.T) { + var wg sync.WaitGroup + for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { + wg.Add(1) + go func(tbl string) { + defer wg.Done() + runMultipleConnections(ctx, t, tbl) + }(workloadTable) + } + time.Sleep(5 * time.Second) + cancel() // will cause runMultipleConnections() to terminate + wg.Wait() + }) + } + t.Run("wait for replica", func(t *testing.T) { + waitForReplicaCatchup(t) + }) + t.Run("validate metrics", func(t *testing.T) { + validateMetrics(t, tcase.onDeleteAction) + }) + t.Run("validate replication health", func(t *testing.T) { + validateReplicationIsHealthy(t, replica) + }) + if tcase.workload { + t.Run("validate fk", func(t *testing.T) { + testFKIntegrity(t, primary, tcase.onDeleteAction) + testFKIntegrity(t, replica, tcase.onDeleteAction) + }) + } + }) +} func TestStressFK(t *testing.T) { defer cluster.PanicHandler(t) @@ -456,69 +511,23 @@ func TestStressFK(t *testing.T) { validateReplicationIsHealthy(t, replica) }) - t.Run("static data", func(t *testing.T) { - for _, onDeleteAction := range referenceActions { - t.Run(referenceActionMap[onDeleteAction], func(t *testing.T) { - t.Run("create schema", func(t *testing.T) { - createInitialSchema(t, onDeleteAction) - }) - t.Run("init tables", func(t *testing.T) { - populateTables(t) - }) - t.Run("wait for replica", func(t *testing.T) { - waitForReplicaCatchup(t) - }) - t.Run("validate metrics", func(t *testing.T) { - validateMetrics(t, onDeleteAction) - }) - t.Run("validate replication health", func(t *testing.T) { - validateReplicationIsHealthy(t, replica) - }) - }) + for _, onDeleteAction := range referenceActions { + tcase := &testCase{ + workload: false, + onDeleteAction: onDeleteAction, + onUpdateAction: sqlparser.NoAction, } - }) - t.Run("stress", func(t *testing.T) { - for _, onDeleteAction := range referenceActions { - t.Run(referenceActionMap[onDeleteAction], func(t *testing.T) { - // This tests running a workload on the table, then comparing expected metrics with - // actual table metrics. All this without any ALTER TABLE: this is to validate - // that our testing/metrics logic is sound in the first place. - t.Run("Workload", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Run("create schema", func(t *testing.T) { - createInitialSchema(t, onDeleteAction) - }) - t.Run("init tables", func(t *testing.T) { - populateTables(t) - }) - t.Run("workload", func(t *testing.T) { - var wg sync.WaitGroup - for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { - wg.Add(1) - go func(tbl string) { - defer wg.Done() - runMultipleConnections(ctx, t, tbl) - }(workloadTable) - } - time.Sleep(5 * time.Second) - cancel() // will cause runMultipleConnections() to terminate - wg.Wait() - }) - t.Run("wait for replica", func(t *testing.T) { - waitForReplicaCatchup(t) - }) - validateMetrics(t, onDeleteAction) - t.Run("validate replication health", func(t *testing.T) { - validateReplicationIsHealthy(t, replica) - }) - t.Run("validate fk", func(t *testing.T) { - testFKIntegrity(t, primary, onDeleteAction) - testFKIntegrity(t, replica, onDeleteAction) - }) - }) - }) + executeFKTest(t, tcase) + } + + for _, onDeleteAction := range referenceActions { + tcase := &testCase{ + workload: true, + onDeleteAction: onDeleteAction, + onUpdateAction: sqlparser.NoAction, } - }) + executeFKTest(t, tcase) + } } // createInitialSchema creates the tables from scratch, and drops the foreign key constraints on the replica. From d0ca43b568dc86b9fa4d606f685ee0f254016f1d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:53:22 +0300 Subject: [PATCH 07/28] only one time GITHUB setup Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 1cc6340675e..b3d01e359ac 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -440,6 +440,13 @@ func TestInitialSetup(t *testing.T) { for _, tableName := range tableNames { writeMetrics[tableName] = &WriteMetrics{} } + + if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { + // This is the place to fine tune the stress parameters if GitHub actions are too slow + maxConcurrency = maxConcurrency * 1 + singleConnectionSleepInterval = singleConnectionSleepInterval * 1 + } + t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) } type testCase struct { @@ -500,13 +507,6 @@ func executeFKTest(t *testing.T, tcase *testCase) { func TestStressFK(t *testing.T) { defer cluster.PanicHandler(t) - if val, present := os.LookupEnv("GITHUB_ACTIONS"); present && val != "" { - // This is the place to fine tune the stress parameters if GitHub actions are too slow - maxConcurrency = maxConcurrency * 1 - singleConnectionSleepInterval = singleConnectionSleepInterval * 1 - } - t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) - t.Run("validate replication health", func(t *testing.T) { validateReplicationIsHealthy(t, replica) }) From 2b15119f0684b13f8a51cc92c8531664c7769aec Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 08:56:52 +0300 Subject: [PATCH 08/28] ExecuteFKTest is public Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index b3d01e359ac..8403497167a 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -456,7 +456,12 @@ type testCase struct { onlineDDLTable string } -func executeFKTest(t *testing.T, tcase *testCase) { +// ExecuteFKTest runs a single test case, which can be: +// - With/out workload +// - Either one of ON DELETE actions +// - Either one of ON UPDATE actions +// - Potentially running an Online DDL on an indicated table (this will not work in Vanilla MySQL, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/) +func ExecuteFKTest(t *testing.T, tcase *testCase) { workloadName := "static data" if tcase.workload { workloadName = "workload" @@ -504,6 +509,7 @@ func executeFKTest(t *testing.T, tcase *testCase) { } }) } + func TestStressFK(t *testing.T) { defer cluster.PanicHandler(t) @@ -517,7 +523,7 @@ func TestStressFK(t *testing.T) { onDeleteAction: onDeleteAction, onUpdateAction: sqlparser.NoAction, } - executeFKTest(t, tcase) + ExecuteFKTest(t, tcase) } for _, onDeleteAction := range referenceActions { @@ -526,7 +532,7 @@ func TestStressFK(t *testing.T) { onDeleteAction: onDeleteAction, onUpdateAction: sqlparser.NoAction, } - executeFKTest(t, tcase) + ExecuteFKTest(t, tcase) } } From afa936477d5b82d5336a21e46aacc8604fbab145 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 09:00:24 +0300 Subject: [PATCH 09/28] prepare for onUpdateAction Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 8403497167a..1d5ba546f8d 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -404,7 +404,7 @@ func waitForReplicaCatchup(t *testing.T) { } } -func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { +func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { tname := fmt.Sprintf("validate metrics: %s", workloadTable) t.Run(tname, func(t *testing.T) { @@ -472,7 +472,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { defer cancel() t.Run("create schema", func(t *testing.T) { - createInitialSchema(t, tcase.onDeleteAction) + createInitialSchema(t, tcase.onDeleteAction, tcase.onUpdateAction) }) t.Run("init tables", func(t *testing.T) { populateTables(t) @@ -496,17 +496,15 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { waitForReplicaCatchup(t) }) t.Run("validate metrics", func(t *testing.T) { - validateMetrics(t, tcase.onDeleteAction) + validateMetrics(t, tcase.onDeleteAction, tcase.onUpdateAction) }) t.Run("validate replication health", func(t *testing.T) { validateReplicationIsHealthy(t, replica) }) - if tcase.workload { - t.Run("validate fk", func(t *testing.T) { - testFKIntegrity(t, primary, tcase.onDeleteAction) - testFKIntegrity(t, replica, tcase.onDeleteAction) - }) - } + t.Run("validate fk", func(t *testing.T) { + testFKIntegrity(t, primary, tcase.onDeleteAction, tcase.onUpdateAction) + testFKIntegrity(t, replica, tcase.onDeleteAction, tcase.onUpdateAction) + }) }) } @@ -537,7 +535,7 @@ func TestStressFK(t *testing.T) { } // createInitialSchema creates the tables from scratch, and drops the foreign key constraints on the replica. -func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction) { +func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -865,7 +863,7 @@ func testSelectTableMetrics(t *testing.T, tablet *cluster.Vttablet, tableName st // - On the primary database, this test trivially passes because of course MySQL maintains this integrity. But remember // that we remove the foreign key constraints on the replica. Also remember that cascaded writes are not written to // the binary log. And so, if VTGate does not do a proper job, then a parent and child will drift apart in CASCADE writes. -func testFKIntegrity(t *testing.T, tablet *cluster.Vttablet, onDeleteAction sqlparser.ReferenceAction) { +func testFKIntegrity(t *testing.T, tablet *cluster.Vttablet, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { testName := tabletTestName(t, tablet) t.Run(testName, func(t *testing.T) { t.Run("matching parent-child rows", func(t *testing.T) { From 762ae446bb98b06ab6bec30c955584dbe26d9a23 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 09:19:54 +0300 Subject: [PATCH 10/28] running variety of ON UPDATE actions Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 1d5ba546f8d..0a44004a1d5 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -173,7 +173,7 @@ var ( key parent_id_idx(parent_id), key created_idx(created_timestamp), key updates_idx(updates), - CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFERENCES stress_parent (id) ON DELETE %s + CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFERENCES stress_parent (id) ON DELETE %s ON UPDATE %s ) ENGINE=InnoDB `, ` @@ -188,7 +188,7 @@ var ( key parent_id_idx(parent_id), key created_idx(created_timestamp), key updates_idx(updates), - CONSTRAINT child2_parent_fk FOREIGN KEY (parent_id) REFERENCES stress_parent (id) ON DELETE %s + CONSTRAINT child2_parent_fk FOREIGN KEY (parent_id) REFERENCES stress_parent (id) ON DELETE %s ON UPDATE %s ) ENGINE=InnoDB `, ` @@ -410,10 +410,10 @@ func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction, onU t.Run(tname, func(t *testing.T) { var primaryRows, replicaRows int64 t.Run(tabletTestName(t, primary), func(t *testing.T) { - primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction) + primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction, onUpdateAction) }) t.Run(tabletTestName(t, replica), func(t *testing.T) { - replicaRows = testSelectTableMetrics(t, replica, workloadTable, onDeleteAction) + replicaRows = testSelectTableMetrics(t, replica, workloadTable, onDeleteAction, onUpdateAction) }) t.Run("compare primary and replica", func(t *testing.T) { assert.Equal(t, primaryRows, replicaRows) @@ -515,22 +515,26 @@ func TestStressFK(t *testing.T) { validateReplicationIsHealthy(t, replica) }) - for _, onDeleteAction := range referenceActions { - tcase := &testCase{ - workload: false, - onDeleteAction: onDeleteAction, - onUpdateAction: sqlparser.NoAction, + for _, onUpdateAction := range referenceActions { + for _, onDeleteAction := range referenceActions { + tcase := &testCase{ + workload: false, + onDeleteAction: onDeleteAction, + onUpdateAction: onUpdateAction, + } + ExecuteFKTest(t, tcase) } - ExecuteFKTest(t, tcase) } - for _, onDeleteAction := range referenceActions { - tcase := &testCase{ - workload: true, - onDeleteAction: onDeleteAction, - onUpdateAction: sqlparser.NoAction, + for _, onUpdateAction := range referenceActions { + for _, onDeleteAction := range referenceActions { + tcase := &testCase{ + workload: true, + onDeleteAction: onDeleteAction, + onUpdateAction: onUpdateAction, + } + ExecuteFKTest(t, tcase) } - ExecuteFKTest(t, tcase) } } @@ -551,7 +555,7 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, // Create the stress tables var b strings.Builder for _, sql := range createStatements { - b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction])) + b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction])) b.WriteString(";") } err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, b.String()) @@ -823,7 +827,13 @@ func populateTables(t *testing.T) { // actual number of rows and with the row values on those tables. // With CASCADE/SET NULL rules we can't do the comparison, because child tables are implicitly affected by the cascading rules, // and the values do not match what reported to us when we UPDATE/DELETE on the parent tables. -func testSelectTableMetrics(t *testing.T, tablet *cluster.Vttablet, tableName string, onDeleteAction sqlparser.ReferenceAction) int64 { +func testSelectTableMetrics( + t *testing.T, + tablet *cluster.Vttablet, + tableName string, + onDeleteAction sqlparser.ReferenceAction, + onUpdateAction sqlparser.ReferenceAction, +) int64 { switch onDeleteAction { case sqlparser.Cascade, sqlparser.SetNull: if tableName != parentTableName { @@ -831,6 +841,8 @@ func testSelectTableMetrics(t *testing.T, tablet *cluster.Vttablet, tableName st return 0 } } + // metrics are unaffected by value of onUpdateAction. + writeMetrics[tableName].mu.Lock() defer writeMetrics[tableName].mu.Unlock() @@ -881,7 +893,7 @@ func testFKIntegrity(t *testing.T, tablet *cluster.Vttablet, onDeleteAction sqlp assert.NotZero(t, len(rs.Rows)) t.Logf("===== matching rows: %v", len(rs.Rows)) }) - if onDeleteAction != sqlparser.SetNull { + if onDeleteAction != sqlparser.SetNull && onUpdateAction != sqlparser.SetNull { // Because with SET NULL there _are_ orphaned rows t.Run("parent-child orphaned rows", func(t *testing.T) { rs := queryTablet(t, tablet, selectOrphanedRowsChild, "") From 4206ef3589f35f159e10141292f42335daa28f41 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 11:59:19 +0300 Subject: [PATCH 11/28] Only seed tables once, then re-used a cloned image of that seed Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 109 ++++++++++++++---- 1 file changed, 88 insertions(+), 21 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 0a44004a1d5..246d9da3d94 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -140,6 +140,8 @@ var ( tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName} reverseTableNames []string + seedOnce sync.Once + referenceActionMap = map[sqlparser.ReferenceAction]string{ sqlparser.NoAction: "NO ACTION", sqlparser.Cascade: "CASCADE", @@ -203,7 +205,7 @@ var ( key parent_id_idx(parent_id), key created_idx(created_timestamp), key updates_idx(updates), - CONSTRAINT grandchild_child_fk FOREIGN KEY (parent_id) REFERENCES stress_child (id) ON DELETE %s + CONSTRAINT grandchild_child_fk FOREIGN KEY (parent_id) REFERENCES stress_child (id) ON DELETE %s ON UPDATE %s ) ENGINE=InnoDB `, } @@ -406,8 +408,7 @@ func waitForReplicaCatchup(t *testing.T) { func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { - tname := fmt.Sprintf("validate metrics: %s", workloadTable) - t.Run(tname, func(t *testing.T) { + t.Run(workloadTable, func(t *testing.T) { var primaryRows, replicaRows int64 t.Run(tabletTestName(t, primary), func(t *testing.T) { primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction, onUpdateAction) @@ -554,17 +555,17 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, t.Run("creating tables", func(t *testing.T) { // Create the stress tables var b strings.Builder - for _, sql := range createStatements { - b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction])) + for i, sql := range createStatements { + if i == 0 { + // parent table, no foreign keys + b.WriteString(sql) + } else { + b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction])) + } b.WriteString(";") } err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, b.String()) require.NoError(t, err) - - rs, err := conn.ExecuteFetch("show full tables", 1000, true) - require.NoError(t, err) - require.Equal(t, 4, len(rs.Rows)) - t.Logf("===== init: %d tables created", len(rs.Rows)) }) t.Run("wait for replica", func(t *testing.T) { waitForReplicaCatchup(t) @@ -808,19 +809,85 @@ func populateTables(t *testing.T) { _, err = conn.ExecuteFetch(deleteQuery, 1000, true) require.Nil(t, err) } - t.Logf("===== populating tables") - for _, tableName := range tableNames { - // populate parent, then child, child2, then grandchild - for i := 0; i < maxTableRows/2; i++ { - generateInsert(t, tableName, conn) - } - for i := 0; i < maxTableRows/4; i++ { - generateUpdate(t, tableName, conn) - } - for i := 0; i < maxTableRows/4; i++ { - generateDelete(t, tableName, conn) + // In an ideal world we would randomly re-seed the tables in each and every instance of the test. + // In reality, that takes a lot of time, and while the seeding is important, it's not the heart of + // the test. To that effect, the seeding works as follows: + // - First ever time, we randomly seed the tables (running thousands of queries). We then create *_seed + // tables and clone the data in those seed tables. + // - 2nd test and forward: we just copy over the rows from the *_seed tables. + tablesSeeded := false + seedOnce.Do(func() { + for _, tableName := range tableNames { + t.Run(tableName, func(t *testing.T) { + t.Run("populating", func(t *testing.T) { + // populate parent, then child, child2, then grandchild + for i := 0; i < maxTableRows/2; i++ { + generateInsert(t, tableName, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateUpdate(t, tableName, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateDelete(t, tableName, conn) + } + }) + t.Run("creating seed", func(t *testing.T) { + // We create the seed table in the likeness of stress_parent, because that's the only table + // that doesn't have FK constraints. + { + createSeedQuery := fmt.Sprintf("create table %s_seed like %s", tableName, parentTableName) + _, err := conn.ExecuteFetch(createSeedQuery, 1000, true) + require.NoError(t, err) + } + { + seedQuery := fmt.Sprintf("insert into %s_seed select * from %s", tableName, tableName) + _, err := conn.ExecuteFetch(seedQuery, 1000, true) + require.NoError(t, err) + } + { + validationQuery := fmt.Sprintf("select count(*) as c from %s_seed", tableName) + rs, err := conn.ExecuteFetch(validationQuery, 1000, true) + require.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + require.NotZero(t, row.AsInt64("c", 0)) + } + }) + }) } + tablesSeeded = true + }) + if !tablesSeeded { + t.Run("reseeding", func(t *testing.T) { + for _, tableName := range tableNames { + seedQuery := fmt.Sprintf("insert into %s select * from %s_seed", tableName, tableName) + _, err := conn.ExecuteFetch(seedQuery, 1000, true) + require.NoError(t, err) + } + }) } + + t.Run("validating table rows", func(t *testing.T) { + for _, tableName := range tableNames { + validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName) + rs, err := conn.ExecuteFetch(validationQuery, 1000, true) + require.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + numRows := row.AsInt64("num_rows", 0) + sumUpdates := row.AsInt64("sum_updates", 0) + require.NotZero(t, numRows) + if !tablesSeeded { + // We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now, + // this function only takes care of the base seed. We will later on run a stress workload on + // these tables, at the end of which we will examine the writeMetrics. We thus have to have those + // metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics. + writeMetrics[tableName].deletes = 1 + writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes + writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes + } + } + }) } // testSelectTableMetrics cross references the known metrics (number of successful insert/delete/updates) on each table, with the From 33393cc526bda0331a98da37e1968ec1b6775c2b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 12:08:47 +0300 Subject: [PATCH 12/28] UPDATEs can modify 'id' column, affecting ON UPDATE SET NULL, ON UPDATE CASCADE Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 246d9da3d94..d469b0dc1b4 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -695,8 +695,16 @@ func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { } func generateUpdate(t *testing.T, tableName string, conn *mysql.Conn) error { + // Most of the UPDATEs we run are "normal" updates, but the minority will actually change the + // `id` column itself, which is the FOREIGN KEY parent column for some of the tables. id := rand.Int31n(int32(maxTableRows)) query := fmt.Sprintf(updateRowStatement, tableName, id) + if tableName == parentTableName || tableName == childTableName { + if rand.Intn(4) == 0 { + updatedId := rand.Int31n(int32(maxTableRows)) + query = fmt.Sprintf(updateRowIdStatement, tableName, updatedId, id) + } + } qr, err := conn.ExecuteFetch(query, 1000, true) func() { From ccedd35d12972e6e9059177ed4b3592d35fc743b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 17 Aug 2023 13:24:34 +0300 Subject: [PATCH 13/28] provisional support for OnlineDDL, though not really using it Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 90 +++++++++++++++++-- 1 file changed, 83 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index d469b0dc1b4..3f7e2864b19 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -37,7 +37,10 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" ) @@ -129,6 +132,7 @@ var ( replica *cluster.Vttablet vtParams mysql.ConnParams + onlineDDLStrategy = "vitess --unsafe-allow-foreign-keys --cut-over-threshold=15s" hostname = "localhost" keyspaceName = "ks" cell = "zone1" @@ -214,6 +218,9 @@ var ( `ALTER TABLE stress_child2 DROP CONSTRAINT child2_parent_fk`, `ALTER TABLE stress_grandchild DROP CONSTRAINT grandchild_child_fk`, } + alterHintStatement = ` + ALTER TABLE %s modify hint_col varchar(64) not null default '%s' + ` insertRowStatement = ` INSERT IGNORE INTO %s (id, parent_id, rand_val) VALUES (%d, %d, left(md5(rand()), 8)) ` @@ -255,7 +262,9 @@ var ( ) const ( - maxTableRows = 4096 + maxTableRows = 4096 + workloadDuration = 5 * time.Second + migrationWaitTimeout = 60 * time.Second ) // The following variables are fit for a local, strong developer box. @@ -292,6 +301,7 @@ func TestMain(m *testing.M) { "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--watch_replication_stream", + "--vreplication_tablet_type", "primary", } clusterInstance.VtGateExtraArgs = []string{} @@ -488,7 +498,19 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { runMultipleConnections(ctx, t, tbl) }(workloadTable) } - time.Sleep(5 * time.Second) + timer := time.NewTimer(workloadDuration) + + if tcase.onlineDDLTable != "" { + t.Run(fmt.Sprintf("online ddl: %s", tcase.onlineDDLTable), func(t *testing.T) { + // This cannot work with Vanilla MySQL. We put the code for testing, but we're not actually going to use it + // for now. The test cases all have empty tcase.onlineDDLTable + hint := "hint-alter" + uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, tcase.onlineDDLTable, hint), onlineDDLStrategy, "vtgate", hint) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + } + + <-timer.C cancel() // will cause runMultipleConnections() to terminate wg.Wait() }) @@ -572,10 +594,10 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, }) t.Run("validating tables: vttablet", func(t *testing.T) { // Check if table is created. Checked on tablets. - checkTable(t, parentTableName) - checkTable(t, childTableName) - checkTable(t, child2TableName) - checkTable(t, grandchildTableName) + checkTable(t, parentTableName, "hint_col") + checkTable(t, childTableName, "hint_col") + checkTable(t, child2TableName, "hint_col") + checkTable(t, grandchildTableName, "hint_col") }) t.Run("validating tables: vtgate", func(t *testing.T) { // Wait for tables to appear on VTGate @@ -607,6 +629,56 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, }) } +// testOnlineDDLStatement runs an online DDL, ALTER statement +func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string) (uuid string) { + if executeStrategy == "vtgate" { + row := onlineddl.VtgateExecDDL(t, &vtParams, ddlStrategy, alterStatement, "").Named().Row() + if row != nil { + uuid = row.AsString("uuid", "") + } + } else { + var err error + uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, cluster.VtctlClientParams{DDLStrategy: ddlStrategy}) + assert.NoError(t, err) + } + uuid = strings.TrimSpace(uuid) + fmt.Println("# Generated UUID (for debug purposes):") + fmt.Printf("<%s>\n", uuid) + + strategySetting, err := schema.ParseDDLStrategy(ddlStrategy) + assert.NoError(t, err) + + if !strategySetting.Strategy.IsDirect() { + t.Logf("===== waiting for migration %v to conclude", uuid) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + } + + if expectHint != "" { + stmt, err := sqlparser.Parse(alterStatement) + require.NoError(t, err) + ddlStmt, ok := stmt.(sqlparser.DDLStatement) + require.True(t, ok) + tableName := ddlStmt.GetTable().Name.String() + checkTable(t, tableName, expectHint) + } + + if !strategySetting.Strategy.IsDirect() { + // let's see what FK tables have been renamed to + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + + artifacts := textutil.SplitDelimitedList(row.AsString("artifacts", "")) + for _, artifact := range artifacts { + checkTable(t, artifact, "") + } + } + + return uuid +} + // waitForTable waits until table is seen in VTGate func waitForTable(t *testing.T, tableName string, conn *mysql.Conn) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -629,9 +701,13 @@ func waitForTable(t *testing.T, tableName string, conn *mysql.Conn) { } // checkTable checks that the given table exists on all tablets -func checkTable(t *testing.T, showTableName string) { +func checkTable(t *testing.T, showTableName string, expectHint string) { for _, tablet := range shards[0].Vttablets { checkTablesCount(t, tablet, showTableName, 1) + if expectHint != "" { + createStatement := getCreateTableStatement(t, tablet, showTableName) + assert.Contains(t, createStatement, expectHint) + } } } From 0d78ad3b8011d87afd810d32c814f76cefef2de4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 23 Aug 2023 17:41:01 +0300 Subject: [PATCH 14/28] creating a vschema with 'foreignKeyMode: FK_MANAGED', using WaitForColumn Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 3f7e2864b19..d5ea01c7e3d 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -38,6 +38,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" + "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" @@ -312,6 +313,10 @@ func TestMain(m *testing.M) { // Start keyspace keyspace := &cluster.Keyspace{ Name: keyspaceName, + VSchema: `{ + "sharded": false, + "foreignKeyMode": "FK_MANAGED" + }`, } // We will use a replica to confirm that vtgate's cascading works correctly. @@ -606,6 +611,11 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, waitForTable(t, child2TableName, conn) waitForTable(t, grandchildTableName, conn) }) + for _, tableName := range []string{childTableName, child2TableName, grandchildTableName} { + err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id") + require.NoError(t, err) + } + t.Run("dropping foreign keys on replica", func(t *testing.T) { for _, statement := range dropConstraintsStatements { _ = queryTablet(t, replica, "set global super_read_only=0", "") From a63faba5526bf9c5743f76c152f5d4365f3ef19e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 23 Aug 2023 17:44:26 +0300 Subject: [PATCH 15/28] include parent table, name the test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index d5ea01c7e3d..f8c1b20d4e0 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -611,10 +611,12 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, waitForTable(t, child2TableName, conn) waitForTable(t, grandchildTableName, conn) }) - for _, tableName := range []string{childTableName, child2TableName, grandchildTableName} { - err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id") - require.NoError(t, err) - } + t.Run("waiting for vschema definition to apply", func(t *testing.T) { + for _, tableName := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { + err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id") + require.NoError(t, err) + } + }) t.Run("dropping foreign keys on replica", func(t *testing.T) { for _, statement := range dropConstraintsStatements { From 7323624a93a098f75474bf47944fbb8f13be73d7 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 27 Aug 2023 12:16:22 +0300 Subject: [PATCH 16/28] validate no errors for UPDATEs with ON UPDATE CASCADE. Validate no errors for DELETEs with ON DELETE CASCADE Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 83 +++++++++++++++---- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index f8c1b20d4e0..3185a0916bb 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" @@ -90,6 +91,8 @@ type WriteMetrics struct { insertsAttempts, insertsFailures, insertsNoops, inserts int64 updatesAttempts, updatesFailures, updatesNoops, updates int64 deletesAttempts, deletesFailures, deletesNoops, deletes int64 + + insertsFKErrors, updatesFKErrors, deletesFKErrors int64 } func (w *WriteMetrics) Clear() { @@ -421,15 +424,16 @@ func waitForReplicaCatchup(t *testing.T) { } } -func validateMetrics(t *testing.T, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { +func validateMetrics(t *testing.T, tcase *testCase) { for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { t.Run(workloadTable, func(t *testing.T) { var primaryRows, replicaRows int64 t.Run(tabletTestName(t, primary), func(t *testing.T) { - primaryRows = testSelectTableMetrics(t, primary, workloadTable, onDeleteAction, onUpdateAction) + primaryRows = testSelectTableMetrics(t, primary, workloadTable, tcase) + testSelectTableFKErrors(t, workloadTable, tcase) }) t.Run(tabletTestName(t, replica), func(t *testing.T) { - replicaRows = testSelectTableMetrics(t, replica, workloadTable, onDeleteAction, onUpdateAction) + replicaRows = testSelectTableMetrics(t, replica, workloadTable, tcase) }) t.Run("compare primary and replica", func(t *testing.T) { assert.Equal(t, primaryRows, replicaRows) @@ -488,7 +492,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { defer cancel() t.Run("create schema", func(t *testing.T) { - createInitialSchema(t, tcase.onDeleteAction, tcase.onUpdateAction) + createInitialSchema(t, tcase) }) t.Run("init tables", func(t *testing.T) { populateTables(t) @@ -524,14 +528,14 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { waitForReplicaCatchup(t) }) t.Run("validate metrics", func(t *testing.T) { - validateMetrics(t, tcase.onDeleteAction, tcase.onUpdateAction) + validateMetrics(t, tcase) }) t.Run("validate replication health", func(t *testing.T) { validateReplicationIsHealthy(t, replica) }) t.Run("validate fk", func(t *testing.T) { - testFKIntegrity(t, primary, tcase.onDeleteAction, tcase.onUpdateAction) - testFKIntegrity(t, replica, tcase.onDeleteAction, tcase.onUpdateAction) + testFKIntegrity(t, primary, tcase) + testFKIntegrity(t, replica, tcase) }) }) } @@ -567,7 +571,7 @@ func TestStressFK(t *testing.T) { } // createInitialSchema creates the tables from scratch, and drops the foreign key constraints on the replica. -func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { +func createInitialSchema(t *testing.T, tcase *testCase) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -587,7 +591,7 @@ func createInitialSchema(t *testing.T, onDeleteAction sqlparser.ReferenceAction, // parent table, no foreign keys b.WriteString(sql) } else { - b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction])) + b.WriteString(fmt.Sprintf(sql, referenceActionMap[tcase.onDeleteAction], referenceActionMap[tcase.onUpdateAction])) } b.WriteString(";") } @@ -757,6 +761,25 @@ func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName s return statement } +func isFKError(err error) bool { + if err == nil { + return false + } + sqlErr, ok := err.(*sqlerror.SQLError) + if !ok { + return false + } + + switch sqlErr.Number() { + case sqlerror.ERNoReferencedRow, + sqlerror.ERRowIsReferenced, + sqlerror.ERRowIsReferenced2, + sqlerror.ErNoReferencedRow2: + return true + } + return false +} + func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { id := rand.Int31n(int32(maxTableRows)) parentId := rand.Int31n(int32(maxTableRows)) @@ -770,6 +793,9 @@ func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { writeMetrics[tableName].insertsAttempts++ if err != nil { writeMetrics[tableName].insertsFailures++ + if isFKError(err) { + writeMetrics[tableName].insertsFKErrors++ + } return } assert.Less(t, qr.RowsAffected, uint64(2)) @@ -802,6 +828,9 @@ func generateUpdate(t *testing.T, tableName string, conn *mysql.Conn) error { writeMetrics[tableName].updatesAttempts++ if err != nil { writeMetrics[tableName].updatesFailures++ + if isFKError(err) { + writeMetrics[tableName].updatesFKErrors++ + } return } assert.Less(t, qr.RowsAffected, uint64(2)) @@ -826,6 +855,9 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error { writeMetrics[tableName].deletesAttempts++ if err != nil { writeMetrics[tableName].deletesFailures++ + if isFKError(err) { + writeMetrics[tableName].deletesFKErrors++ + } return } assert.Less(t, qr.RowsAffected, uint64(2)) @@ -994,10 +1026,9 @@ func testSelectTableMetrics( t *testing.T, tablet *cluster.Vttablet, tableName string, - onDeleteAction sqlparser.ReferenceAction, - onUpdateAction sqlparser.ReferenceAction, + tcase *testCase, ) int64 { - switch onDeleteAction { + switch tcase.onDeleteAction { case sqlparser.Cascade, sqlparser.SetNull: if tableName != parentTableName { // We can't validate those tables because they will have been affected by cascading rules. @@ -1029,6 +1060,23 @@ func testSelectTableMetrics( return numRows } +// testSelectTableFKErrors +func testSelectTableFKErrors( + t *testing.T, + tableName string, + tcase *testCase, +) { + writeMetrics[tableName].mu.Lock() + defer writeMetrics[tableName].mu.Unlock() + + if tcase.onDeleteAction == sqlparser.Cascade { + assert.Zerof(t, writeMetrics[tableName].deletesFKErrors, "unexpected foreign key errors for DELETEs in ON DELETE CASCADE") + } + if tcase.onUpdateAction == sqlparser.Cascade { + assert.Zerof(t, writeMetrics[tableName].updatesFKErrors, "unexpected foreign key errors for UPDATEs in ON UPDATE CASCADE") + } +} + // testFKIntegrity validates that foreign key consitency is maintained on the given tablet. We cross reference all // parent-child relationships. // There are two test types: @@ -1038,25 +1086,26 @@ func testSelectTableMetrics( // - On the primary database, this test trivially passes because of course MySQL maintains this integrity. But remember // that we remove the foreign key constraints on the replica. Also remember that cascaded writes are not written to // the binary log. And so, if VTGate does not do a proper job, then a parent and child will drift apart in CASCADE writes. -func testFKIntegrity(t *testing.T, tablet *cluster.Vttablet, onDeleteAction sqlparser.ReferenceAction, onUpdateAction sqlparser.ReferenceAction) { +func testFKIntegrity( + t *testing.T, + tablet *cluster.Vttablet, + tcase *testCase, +) { testName := tabletTestName(t, tablet) t.Run(testName, func(t *testing.T) { t.Run("matching parent-child rows", func(t *testing.T) { rs := queryTablet(t, tablet, selectMatchingRowsChild, "") assert.NotZero(t, len(rs.Rows)) - t.Logf("===== matching rows: %v", len(rs.Rows)) }) t.Run("matching parent-child2 rows", func(t *testing.T) { rs := queryTablet(t, tablet, selectMatchingRowsChild2, "") assert.NotZero(t, len(rs.Rows)) - t.Logf("===== matching rows: %v", len(rs.Rows)) }) t.Run("matching child-grandchild rows", func(t *testing.T) { rs := queryTablet(t, tablet, selectMatchingRowsGrandchild, "") assert.NotZero(t, len(rs.Rows)) - t.Logf("===== matching rows: %v", len(rs.Rows)) }) - if onDeleteAction != sqlparser.SetNull && onUpdateAction != sqlparser.SetNull { + if tcase.onDeleteAction != sqlparser.SetNull && tcase.onUpdateAction != sqlparser.SetNull { // Because with SET NULL there _are_ orphaned rows t.Run("parent-child orphaned rows", func(t *testing.T) { rs := queryTablet(t, tablet, selectOrphanedRowsChild, "") From 9c8650a5ad0691b197cebf7a4931b179e08cafa9 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 27 Aug 2023 12:32:51 +0300 Subject: [PATCH 17/28] Refactor test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 3185a0916bb..b2c6d12994b 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -427,10 +427,12 @@ func waitForReplicaCatchup(t *testing.T) { func validateMetrics(t *testing.T, tcase *testCase) { for _, workloadTable := range []string{parentTableName, childTableName, child2TableName, grandchildTableName} { t.Run(workloadTable, func(t *testing.T) { + t.Run("fk errors", func(t *testing.T) { + testSelectTableFKErrors(t, workloadTable, tcase) + }) var primaryRows, replicaRows int64 t.Run(tabletTestName(t, primary), func(t *testing.T) { primaryRows = testSelectTableMetrics(t, primary, workloadTable, tcase) - testSelectTableFKErrors(t, workloadTable, tcase) }) t.Run(tabletTestName(t, replica), func(t *testing.T) { replicaRows = testSelectTableMetrics(t, replica, workloadTable, tcase) From adca64addfe8e105f0f96201222ec691477b0acd Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 27 Aug 2023 13:05:29 +0300 Subject: [PATCH 18/28] clear fk failrue metrics Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index b2c6d12994b..0f45ddeb85c 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -114,6 +114,10 @@ func (w *WriteMetrics) Clear() { w.deletesAttempts = 0 w.deletesFailures = 0 w.deletesNoops = 0 + + w.insertsFKErrors = 0 + w.updatesFKErrors = 0 + w.deletesFKErrors = 0 } func (w *WriteMetrics) String() string { From d0449d66c8278b728e9a2f582470077905e43c87 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 29 Aug 2023 11:20:09 +0300 Subject: [PATCH 19/28] isFKError: reverse filtering logic. Assume everything is a FK error unless one of known errors (namely a duplicate key error) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vtgate/foreignkey/stress/fk_stress_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 0f45ddeb85c..1ea416e7d68 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -776,14 +776,22 @@ func isFKError(err error) bool { return false } + // Let's try and account for all known errors: switch sqlErr.Number() { + case sqlerror.ERDupEntry: + return false case sqlerror.ERNoReferencedRow, sqlerror.ERRowIsReferenced, sqlerror.ERRowIsReferenced2, sqlerror.ErNoReferencedRow2: return true + case sqlerror.ERNotSupportedYet: + return true } - return false + // Unknown error + fmt.Printf("Unexpected error detected in isFKError: %v\n", err) + // Treat it as if it's a FK error + return true } func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { From b0dfdca18bc6c49959dd5b91e00b4f2ccfe011c1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 30 Aug 2023 08:06:55 +0300 Subject: [PATCH 20/28] support ERTooManyUserConnections Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 1ea416e7d68..a2b1f1c107f 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -778,7 +778,9 @@ func isFKError(err error) bool { // Let's try and account for all known errors: switch sqlErr.Number() { - case sqlerror.ERDupEntry: + case sqlerror.ERDupEntry: // happens since we hammer the tables randomly + return false + case sqlerror.ERTooManyUserConnections: // can happen in Online DDL cut-over return false case sqlerror.ERNoReferencedRow, sqlerror.ERRowIsReferenced, From d527cf0a1c897d9710aa4627be51db926f73c63c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 30 Aug 2023 12:54:03 +0300 Subject: [PATCH 21/28] go mod tidy Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go.mod | 17 +++++++++-------- go.sum | 33 ++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 078b4db8922..697ebad1feb 100644 --- a/go.mod +++ b/go.mod @@ -72,15 +72,15 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.8 go.etcd.io/etcd/client/v3 v3.5.8 go.uber.org/mock v0.2.0 - golang.org/x/crypto v0.8.0 // indirect - golang.org/x/mod v0.11.0 // indirect - golang.org/x/net v0.9.0 + golang.org/x/crypto v0.12.0 // indirect + golang.org/x/mod v0.12.0 // indirect + golang.org/x/net v0.14.0 golang.org/x/oauth2 v0.7.0 - golang.org/x/sys v0.8.0 // indirect - golang.org/x/term v0.8.0 - golang.org/x/text v0.9.0 + golang.org/x/sys v0.11.0 // indirect + golang.org/x/term v0.11.0 + golang.org/x/text v0.12.0 golang.org/x/time v0.3.0 - golang.org/x/tools v0.8.0 + golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 google.golang.org/api v0.121.0 google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.55.0-dev @@ -107,7 +107,8 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/xlab/treeprint v1.2.0 go.uber.org/goleak v1.2.1 - golang.org/x/sync v0.1.0 + golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 + golang.org/x/sync v0.3.0 modernc.org/sqlite v1.20.3 ) diff --git a/go.sum b/go.sum index 5a3fa32201a..9249c4456fb 100644 --- a/go.sum +++ b/go.sum @@ -667,8 +667,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= -golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= +golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -679,6 +679,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ= +golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -705,8 +707,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= -golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -759,8 +761,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -784,8 +786,9 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -856,14 +859,14 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= -golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -875,8 +878,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -936,8 +939,8 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= -golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= -golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4= +golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E= +golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From 725ca88037e0d0000e3457324c1d1f53d3f1ef9c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 31 Aug 2023 13:55:12 +0300 Subject: [PATCH 22/28] general preparation for Online DDL (though not enabled). Support more error codes Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/onlineddl/vtgate_util.go | 4 +- .../foreignkey/stress/fk_stress_test.go | 44 +++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index c3b1bfa8864..a77c6226120 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -202,7 +202,7 @@ func CheckLaunchAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo } // CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status -func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) { +func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool { query, err := sqlparser.ParseAndBind("show vitess_migrations like %a", sqltypes.StringBindVariable(uuid), ) @@ -224,7 +224,7 @@ func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []clu } } } - assert.Equal(t, len(shards), count) + return assert.Equal(t, len(shards), count) } // WaitForMigrationStatus waits for a migration to reach either provided statuses (returns immediately), or eventually time out diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index a2b1f1c107f..80166debd68 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -159,7 +159,7 @@ var ( sqlparser.Cascade: "CASCADE", sqlparser.SetNull: "SET NULL", } - referenceActions = []sqlparser.ReferenceAction{sqlparser.NoAction, sqlparser.Cascade, sqlparser.SetNull} + referenceActions = []sqlparser.ReferenceAction{sqlparser.NoAction, sqlparser.SetNull, sqlparser.Cascade} createStatements = []string{ ` CREATE TABLE stress_parent ( @@ -493,6 +493,9 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { workloadName = "workload" } testName := fmt.Sprintf("%s/del=%s/upd=%s", workloadName, referenceActionMap[tcase.onDeleteAction], referenceActionMap[tcase.onUpdateAction]) + if tcase.onlineDDLTable != "" { + testName = fmt.Sprintf("%s/ddl=%s", testName, tcase.onlineDDLTable) + } t.Run(testName, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -516,12 +519,27 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { timer := time.NewTimer(workloadDuration) if tcase.onlineDDLTable != "" { - t.Run(fmt.Sprintf("online ddl: %s", tcase.onlineDDLTable), func(t *testing.T) { + t.Run("migrating", func(t *testing.T) { // This cannot work with Vanilla MySQL. We put the code for testing, but we're not actually going to use it // for now. The test cases all have empty tcase.onlineDDLTable hint := "hint-alter" uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, tcase.onlineDDLTable, hint), onlineDDLStrategy, "vtgate", hint) - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + ok := onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + require.True(t, ok) // or else don't attempt to cleanup artifacts + t.Run("cleanup artifacts", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + + artifacts := textutil.SplitDelimitedList(row.AsString("artifacts", "")) + for _, artifact := range artifacts { + t.Run(artifact, func(t *testing.T) { + err := clusterInstance.VtctlclientProcess.ApplySchema(keyspaceName, "drop table if exists "+artifact) + require.NoError(t, err) + }) + } + }) }) } @@ -553,6 +571,8 @@ func TestStressFK(t *testing.T) { validateReplicationIsHealthy(t, replica) }) + runOnlineDDL := false + for _, onUpdateAction := range referenceActions { for _, onDeleteAction := range referenceActions { tcase := &testCase{ @@ -574,6 +594,20 @@ func TestStressFK(t *testing.T) { ExecuteFKTest(t, tcase) } } + + if runOnlineDDL { + for _, action := range referenceActions { + for _, table := range tableNames { + tcase := &testCase{ + workload: true, + onDeleteAction: action, + onUpdateAction: action, + onlineDDLTable: table, + } + ExecuteFKTest(t, tcase) + } + } + } } // createInitialSchema creates the tables from scratch, and drops the foreign key constraints on the replica. @@ -782,6 +816,10 @@ func isFKError(err error) bool { return false case sqlerror.ERTooManyUserConnections: // can happen in Online DDL cut-over return false + case sqlerror.ERUnknownError: // happens when query buffering times out + return false + case sqlerror.ERQueryInterrupted: // cancelled due to context expiration + return false case sqlerror.ERNoReferencedRow, sqlerror.ERRowIsReferenced, sqlerror.ERRowIsReferenced2, From 319e2bc839fb6b51c5070e0461204283e1cb31be Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 11 Sep 2023 14:29:09 +0300 Subject: [PATCH 23/28] update workflow file Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- ...ster_endtoend_vtgate_foreignkey_stress.yml | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml b/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml index b382140cc2f..d44389d1497 100644 --- a/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml +++ b/.github/workflows/cluster_endtoend_vtgate_foreignkey_stress.yml @@ -16,7 +16,7 @@ env: jobs: build: name: Run endtoend tests on Cluster (vtgate_foreignkey_stress) - runs-on: ubuntu-22.04 + runs-on: gh-hosted-runners-4cores-1 steps: - name: Skip CI @@ -36,6 +36,13 @@ jobs: echo Skip ${skip} echo "skip-workflow=${skip}" >> $GITHUB_OUTPUT + PR_DATA=$(curl \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.v3+json" \ + "https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}") + draft=$(echo "$PR_DATA" | jq .draft -r) + echo "is_draft=${draft}" >> $GITHUB_OUTPUT + - name: Check out code if: steps.skip-workflow.outputs.skip-workflow == 'false' uses: actions/checkout@v3 @@ -64,7 +71,7 @@ jobs: if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' uses: actions/setup-go@v4 with: - go-version: 1.20.5 + go-version: 1.21.0 - name: Set up python if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' @@ -104,7 +111,7 @@ jobs: go install github.com/vitessio/go-junit-report@HEAD - name: Setup launchable dependencies - if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' + if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' run: | # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up pip3 install --user launchable~=1.0 > /dev/null @@ -129,11 +136,13 @@ jobs: # run the tests however you normally do, then produce a JUnit XML file eatmydata -- go run test.go -docker=false -follow -shard vtgate_foreignkey_stress | tee -a output.txt | go-junit-report -set-exit-code > report.xml - - name: Print test output and Record test result in launchable + - name: Print test output and Record test result in launchable if PR is not a draft if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() run: | - # send recorded tests to launchable - launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + if [[ "${{steps.skip-workflow.outputs.is_draft}}" == "false" ]]; then + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + fi # print test output cat output.txt From f5637fe0d49d331960421111e4e93811c1fe3b3f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 12 Sep 2023 09:59:56 +0300 Subject: [PATCH 24/28] collect sample errors; reduce number of tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 80166debd68..d0cab7b9086 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -92,7 +92,8 @@ type WriteMetrics struct { updatesAttempts, updatesFailures, updatesNoops, updates int64 deletesAttempts, deletesFailures, deletesNoops, deletes int64 - insertsFKErrors, updatesFKErrors, deletesFKErrors int64 + insertsFKErrors, updatesFKErrors, deletesFKErrors int64 + sampleInsertFKError, sampleUpdateFKError, sampleDeleteFKError error } func (w *WriteMetrics) Clear() { @@ -573,26 +574,21 @@ func TestStressFK(t *testing.T) { runOnlineDDL := false - for _, onUpdateAction := range referenceActions { - for _, onDeleteAction := range referenceActions { - tcase := &testCase{ - workload: false, - onDeleteAction: onDeleteAction, - onUpdateAction: onUpdateAction, - } - ExecuteFKTest(t, tcase) + for _, action := range referenceActions { + tcase := &testCase{ + workload: false, + onDeleteAction: action, + onUpdateAction: action, } + ExecuteFKTest(t, tcase) } - - for _, onUpdateAction := range referenceActions { - for _, onDeleteAction := range referenceActions { - tcase := &testCase{ - workload: true, - onDeleteAction: onDeleteAction, - onUpdateAction: onUpdateAction, - } - ExecuteFKTest(t, tcase) + for _, action := range referenceActions { + tcase := &testCase{ + workload: true, + onDeleteAction: action, + onUpdateAction: action, } + ExecuteFKTest(t, tcase) } if runOnlineDDL { @@ -849,6 +845,7 @@ func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { writeMetrics[tableName].insertsFailures++ if isFKError(err) { writeMetrics[tableName].insertsFKErrors++ + writeMetrics[tableName].sampleInsertFKError = err } return } @@ -884,6 +881,7 @@ func generateUpdate(t *testing.T, tableName string, conn *mysql.Conn) error { writeMetrics[tableName].updatesFailures++ if isFKError(err) { writeMetrics[tableName].updatesFKErrors++ + writeMetrics[tableName].sampleUpdateFKError = err } return } @@ -911,6 +909,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error { writeMetrics[tableName].deletesFailures++ if isFKError(err) { writeMetrics[tableName].deletesFKErrors++ + writeMetrics[tableName].sampleDeleteFKError = err } return } @@ -1124,10 +1123,10 @@ func testSelectTableFKErrors( defer writeMetrics[tableName].mu.Unlock() if tcase.onDeleteAction == sqlparser.Cascade { - assert.Zerof(t, writeMetrics[tableName].deletesFKErrors, "unexpected foreign key errors for DELETEs in ON DELETE CASCADE") + assert.Zerof(t, writeMetrics[tableName].deletesFKErrors, "unexpected foreign key errors for DELETEs in ON DELETE CASCADE. Sample error: %v", writeMetrics[tableName].sampleDeleteFKError) } if tcase.onUpdateAction == sqlparser.Cascade { - assert.Zerof(t, writeMetrics[tableName].updatesFKErrors, "unexpected foreign key errors for UPDATEs in ON UPDATE CASCADE") + assert.Zerof(t, writeMetrics[tableName].updatesFKErrors, "unexpected foreign key errors for UPDATEs in ON UPDATE CASCADE. Sample error: %v", writeMetrics[tableName].sampleUpdateFKError) } } From 3e8d61eaf4246b3b5d4ba8f7001ae8ae3f971f29 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 18 Sep 2023 08:06:37 +0300 Subject: [PATCH 25/28] allow deadlock error Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index d0cab7b9086..26ed1e6f2f3 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -816,6 +816,8 @@ func isFKError(err error) bool { return false case sqlerror.ERQueryInterrupted: // cancelled due to context expiration return false + case sqlerror.ERLockDeadlock: + return false // bummer, but deadlocks can happen, it's a legit error. case sqlerror.ERNoReferencedRow, sqlerror.ERRowIsReferenced, sqlerror.ERRowIsReferenced2, From 3b863807d1fc71a7efa11a3ae1dea380164c1aee Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 18 Sep 2023 08:49:12 +0300 Subject: [PATCH 26/28] restore full ON DELETE <-> ON UPDATE combination types Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../foreignkey/stress/fk_stress_test.go | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 26ed1e6f2f3..35465857ecb 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -574,24 +574,25 @@ func TestStressFK(t *testing.T) { runOnlineDDL := false - for _, action := range referenceActions { - tcase := &testCase{ - workload: false, - onDeleteAction: action, - onUpdateAction: action, - } - ExecuteFKTest(t, tcase) - } - for _, action := range referenceActions { - tcase := &testCase{ - workload: true, - onDeleteAction: action, - onUpdateAction: action, + // Without workload ; with workload + for _, workload := range []bool{false, true} { + // For any type of ON DELETE action + for _, actionDelete := range referenceActions { + // For any type of ON UPDATE action + for _, actionUpdate := range referenceActions { + tcase := &testCase{ + workload: workload, + onDeleteAction: actionDelete, + onUpdateAction: actionUpdate, + } + ExecuteFKTest(t, tcase) + } } - ExecuteFKTest(t, tcase) } if runOnlineDDL { + // Running Online DDL on all test tables. We don't use all of the combinations + // presented above; we will run with workload, and suffice with same ON DELETE - ON UPDATE actions. for _, action := range referenceActions { for _, table := range tableNames { tcase := &testCase{ From 61de371fd6754aaa19ff01f99562ae1838420db5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 18 Sep 2023 10:29:55 +0300 Subject: [PATCH 27/28] normalizing WaitForReplicationPos Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/cluster/cluster_util.go | 25 ++++++++++--------- .../buffer/reparent/failover_buffer_test.go | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index 0e3cc2d0c95..cc1087b89ce 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -224,25 +224,26 @@ func filterResultWhenRunsForCoverage(input string) string { } // WaitForReplicationPos will wait for replication position to catch-up -func WaitForReplicationPos(t *testing.T, tabletA *Vttablet, tabletB *Vttablet, hostname string, timeout float64) { +func WaitForReplicationPos(t *testing.T, tabletA *Vttablet, tabletB *Vttablet, hostname string, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + replicationPosA, _ := GetPrimaryPosition(t, *tabletA, hostname) for { replicationPosB, _ := GetPrimaryPosition(t, *tabletB, hostname) if positionAtLeast(t, tabletA, replicationPosB, replicationPosA) { - break + return } msg := fmt.Sprintf("%s's replication position to catch up to %s's;currently at: %s, waiting to catch up to: %s", tabletB.Alias, tabletA.Alias, replicationPosB, replicationPosA) - waitStep(t, msg, timeout, 0.01) - } -} - -func waitStep(t *testing.T, msg string, timeout float64, sleepTime float64) float64 { - timeout = timeout - sleepTime - if timeout < 0.0 { - t.Errorf("timeout waiting for condition '%s'", msg) + select { + case <-ctx.Done(): + t.Errorf("timeout waiting for condition '%s'", msg) + return + case <-ticker.C: + } } - time.Sleep(time.Duration(sleepTime) * time.Second) - return timeout } func positionAtLeast(t *testing.T, tablet *Vttablet, a string, b string) bool { diff --git a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go index ace652fc1d2..1ecd4140c4c 100644 --- a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go @@ -51,7 +51,7 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro primary.VttabletProcess.QueryTablet(demoteQuery, keyspaceUnshardedName, true) // Wait for replica to catch up to primary. - cluster.WaitForReplicationPos(t, primary, replica, "localhost", 60.0) + cluster.WaitForReplicationPos(t, primary, replica, "localhost", time.Minute) duration := time.Since(start) minUnavailabilityInS := 1.0 From a33b746dda70fb88a69ad6e695b892d32f54da35 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 18 Sep 2023 11:26:29 +0300 Subject: [PATCH 28/28] WaitForReplicationPos: support 'validateReplication' Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/cluster/cluster_util.go | 30 +++++++++- .../buffer/reparent/failover_buffer_test.go | 2 +- .../foreignkey/stress/fk_stress_test.go | 55 +------------------ 3 files changed, 32 insertions(+), 55 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index cc1087b89ce..1af9504389b 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -223,8 +223,26 @@ func filterResultWhenRunsForCoverage(input string) string { return result } +func ValidateReplicationIsHealthy(t *testing.T, tablet *Vttablet) bool { + query := "show replica status" + rs, err := tablet.VttabletProcess.QueryTablet(query, "", true) + assert.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + + ioRunning := row.AsString("Replica_IO_Running", "") + require.NotEmpty(t, ioRunning) + ioHealthy := assert.Equalf(t, "Yes", ioRunning, "Replication is broken. Replication status: %v", row) + sqlRunning := row.AsString("Replica_SQL_Running", "") + require.NotEmpty(t, sqlRunning) + sqlHealthy := assert.Equalf(t, "Yes", sqlRunning, "Replication is broken. Replication status: %v", row) + + return ioHealthy && sqlHealthy +} + // WaitForReplicationPos will wait for replication position to catch-up -func WaitForReplicationPos(t *testing.T, tabletA *Vttablet, tabletB *Vttablet, hostname string, timeout time.Duration) { +func WaitForReplicationPos(t *testing.T, tabletA *Vttablet, tabletB *Vttablet, validateReplication bool, timeout time.Duration) { + hostname := "localhost" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() ticker := time.NewTicker(10 * time.Millisecond) @@ -232,6 +250,14 @@ func WaitForReplicationPos(t *testing.T, tabletA *Vttablet, tabletB *Vttablet, h replicationPosA, _ := GetPrimaryPosition(t, *tabletA, hostname) for { + if validateReplication { + if !ValidateReplicationIsHealthy(t, tabletB) { + assert.FailNowf(t, "Replication broken on tablet %v. Will not wait for position", tabletB.Alias) + } + if t.Failed() { + return + } + } replicationPosB, _ := GetPrimaryPosition(t, *tabletB, hostname) if positionAtLeast(t, tabletA, replicationPosB, replicationPosA) { return @@ -239,7 +265,7 @@ func WaitForReplicationPos(t *testing.T, tabletA *Vttablet, tabletB *Vttablet, h msg := fmt.Sprintf("%s's replication position to catch up to %s's;currently at: %s, waiting to catch up to: %s", tabletB.Alias, tabletA.Alias, replicationPosB, replicationPosA) select { case <-ctx.Done(): - t.Errorf("timeout waiting for condition '%s'", msg) + assert.FailNowf(t, "Timeout waiting for condition '%s'", msg) return case <-ticker.C: } diff --git a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go index 1ecd4140c4c..d3828eb8166 100644 --- a/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reparent/failover_buffer_test.go @@ -51,7 +51,7 @@ func failoverExternalReparenting(t *testing.T, clusterInstance *cluster.LocalPro primary.VttabletProcess.QueryTablet(demoteQuery, keyspaceUnshardedName, true) // Wait for replica to catch up to primary. - cluster.WaitForReplicationPos(t, primary, replica, "localhost", time.Minute) + cluster.WaitForReplicationPos(t, primary, replica, false, time.Minute) duration := time.Since(start) minUnavailabilityInS := 1.0 diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 35465857ecb..600961e6f0c 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -34,7 +34,6 @@ import ( "golang.org/x/exp/slices" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" @@ -377,56 +376,8 @@ func tabletTestName(t *testing.T, tablet *cluster.Vttablet) string { return "" } -func validateReplicationIsHealthy(t *testing.T, tablet *cluster.Vttablet) bool { - query := "show replica status" - rs, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) - assert.NoError(t, err) - row := rs.Named().Row() - require.NotNil(t, row) - - ioRunning := row.AsString("Replica_IO_Running", "") - require.NotEmpty(t, ioRunning) - ioHealthy := assert.Equalf(t, "Yes", ioRunning, "Replication is broken. Replication status: %v", row) - sqlRunning := row.AsString("Replica_SQL_Running", "") - require.NotEmpty(t, sqlRunning) - sqlHealthy := assert.Equalf(t, "Yes", sqlRunning, "Replication is broken. Replication status: %v", row) - - return ioHealthy && sqlHealthy -} - -func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Position { - rs := queryTablet(t, tablet, "select @@gtid_executed as gtid_executed", "") - row := rs.Named().Row() - require.NotNil(t, row) - gtidExecuted := row.AsString("gtid_executed", "") - require.NotEmpty(t, gtidExecuted) - pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID) - assert.NoError(t, err) - return pos -} - func waitForReplicaCatchup(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - primaryPos := getTabletPosition(t, primary) - for { - replicaPos := getTabletPosition(t, replica) - if replicaPos.GTIDSet.Contains(primaryPos.GTIDSet) { - // success - return - } - if !validateReplicationIsHealthy(t, replica) { - assert.FailNow(t, "replication is broken; not waiting for catchup") - return - } - select { - case <-ctx.Done(): - assert.FailNow(t, "timeout waiting for replica to catch up") - return - case <-time.After(time.Second): - // - } - } + cluster.WaitForReplicationPos(t, primary, replica, true, time.Minute) } func validateMetrics(t *testing.T, tcase *testCase) { @@ -556,7 +507,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { validateMetrics(t, tcase) }) t.Run("validate replication health", func(t *testing.T) { - validateReplicationIsHealthy(t, replica) + cluster.ValidateReplicationIsHealthy(t, replica) }) t.Run("validate fk", func(t *testing.T) { testFKIntegrity(t, primary, tcase) @@ -569,7 +520,7 @@ func TestStressFK(t *testing.T) { defer cluster.PanicHandler(t) t.Run("validate replication health", func(t *testing.T) { - validateReplicationIsHealthy(t, replica) + cluster.ValidateReplicationIsHealthy(t, replica) }) runOnlineDDL := false