diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index e70d43e9a312..720383ed8400 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -2465,6 +2465,170 @@ CREATE TABLE t.test (k INT NOT NULL, v INT); // be using k as the primary key, we disable the UPSERT statements in the test. false, ) + + // runSchemaChangeWithOperations only performs some simple + // operations on the kv table. We want to also run some + // more operations against a table with more columns. + // We separate the columns into multiple different families + // in order to test different cases of reads, writes and + // deletions operating on different sets of families. + if _, err := sqlDB.Exec(` +DROP TABLE t.test; +CREATE TABLE t.test ( + x INT PRIMARY KEY, y INT NOT NULL, z INT, a INT, b INT, + c INT, d INT, FAMILY (x), FAMILY (y), FAMILY (z), + FAMILY (a, b), FAMILY (c), FAMILY (d) +); +`); err != nil { + t.Fatal(err) + } + // Insert into the table. + inserts := make([]string, maxValue+1) + for i := 0; i < maxValue+1; i++ { + inserts[i] = fmt.Sprintf( + "(%d, %d, %d, %d, %d, %d, %d)", + i, i, i, i, i, i, i, + ) + } + if _, err := sqlDB.Exec( + fmt.Sprintf(`INSERT INTO t.test VALUES %s`, strings.Join(inserts, ","))); err != nil { + t.Fatal(err) + } + + notification := initBackfillNotification() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + if _, err := sqlDB.Exec(` +ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (y)`); err != nil { + t.Error(err) + } + wg.Done() + }() + + // Wait until the backfill starts. + <-notification + + // Update some rows. + rowsUpdated := make(map[int]struct{}) + for i := 0; i < 10; i++ { + // Update a row that hasn't been updated yet. + for { + k := rand.Intn(maxValue) + if _, ok := rowsUpdated[k]; !ok { + rowsUpdated[k] = struct{}{} + break + } + } + } + for k := range rowsUpdated { + if _, err := sqlDB.Exec(` +UPDATE t.test SET z = NULL, a = $1, b = NULL, c = NULL, d = $1 WHERE y = $2`, 2*k, k); err != nil { + t.Fatal(err) + } + } + + // Delete some rows. + rowsDeleted := make(map[int]struct{}) + for i := 0; i < 10; i++ { + // Delete a row that hasn't been updated. + for { + k := rand.Intn(maxValue) + _, updated := rowsUpdated[k] + _, deleted := rowsDeleted[k] + if !updated && !deleted { + rowsDeleted[k] = struct{}{} + break + } + } + } + for k := range rowsDeleted { + if _, err := sqlDB.Exec(`DELETE FROM t.test WHERE x = $1`, k); err != nil { + t.Fatal(err) + } + } + + // Insert some rows. + inserts = make([]string, 10) + for i := 0; i < 10; i++ { + val := i + maxValue + 1 + inserts[i] = fmt.Sprintf( + "(%d, %d, %d, %d, %d, %d, %d)", + val, val, val, val, val, val, val, + ) + } + if _, err := sqlDB.Exec( + fmt.Sprintf(`INSERT INTO t.test VALUES %s`, strings.Join(inserts, ","))); err != nil { + t.Fatal(err) + } + + // Wait for the pk change to complete. + wg.Wait() + + // Ensure that the count of rows is correct along both indexes. + var count int + row := sqlDB.QueryRow(`SELECT count(*) FROM t.test@primary`) + if err := row.Scan(&count); err != nil { + t.Fatal(err) + } + if count != maxValue+1 { + t.Fatalf("expected %d rows, found %d", maxValue+1, count) + } + row = sqlDB.QueryRow(`SELECT count(x) FROM t.test@test_x_key`) + if err := row.Scan(&count); err != nil { + t.Fatal(err) + } + if count != maxValue+1 { + t.Fatalf("expected %d rows, found %d", maxValue+1, count) + } + + // Verify that we cannot find our deleted rows. + for k := range rowsDeleted { + row := sqlDB.QueryRow(`SELECT count(*) FROM t.test WHERE x = $1`, k) + if err := row.Scan(&count); err != nil { + t.Fatal(err) + } + if count != 0 { + t.Fatalf("expected %d rows, found %d", 0, count) + } + } + + // Verify that we can find our inserted rows. + for i := 0; i < 10; i++ { + val := i + maxValue + 1 + row := sqlDB.QueryRow(`SELECT * FROM t.test WHERE y = $1`, val) + var x, y, z, a, b, c, d int + if err := row.Scan(&x, &y, &z, &a, &b, &c, &d); err != nil { + t.Fatal(err) + } + for i, v := range []int{x, y, z, a, b, c, d} { + if v != val { + t.Fatalf("expected to find %d for column %d, but found %d", val, i, v) + } + } + } + + // Verify that our updated rows have indeed been updated. + for k := range rowsUpdated { + row := sqlDB.QueryRow(`SELECT * FROM t.test WHERE y = $1`, k) + var ( + x, y, a, d int + z, b, c gosql.NullInt64 + ) + if err := row.Scan(&x, &y, &z, &a, &b, &c, &d); err != nil { + t.Fatal(err) + } + require.Equal(t, k, x) + require.Equal(t, k, y) + require.Equal(t, 2*k, a) + require.Equal(t, 2*k, d) + for _, v := range []gosql.NullInt64{z, b, c} { + if v.Valid { + t.Fatalf("expected NULL but found %d", v.Int64) + } + } + } } // TestPrimaryKeyChangeKVOps tests sequences of k/v operations