Skip to content

Commit

Permalink
sql: add more detailed test for operations during a primary key change
Browse files Browse the repository at this point in the history
The existing operations test did not catch various bugs
uncovered in #45347, so this test expands upon the existing
schema change operations tests with a larger test case.

Release justification: non production code change

Release note: None
  • Loading branch information
rohany committed Mar 11, 2020
1 parent 2863173 commit 5c76da3
Showing 1 changed file with 164 additions and 0 deletions.
164 changes: 164 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2465,6 +2465,170 @@ CREATE TABLE t.test (k INT NOT NULL, v INT);
// be using k as the primary key, we disable the UPSERT statements in the test.
false,
)

// runSchemaChangeWithOperations only performs some simple
// operations on the kv table. We want to also run some
// more operations against a table with more columns.
// We separate the columns into multiple different families
// in order to test different cases of reads, writes and
// deletions operating on different sets of families.
if _, err := sqlDB.Exec(`
DROP TABLE t.test;
CREATE TABLE t.test (
x INT PRIMARY KEY, y INT NOT NULL, z INT, a INT, b INT,
c INT, d INT, FAMILY (x), FAMILY (y), FAMILY (z),
FAMILY (a, b), FAMILY (c), FAMILY (d)
);
`); err != nil {
t.Fatal(err)
}
// Insert into the table.
inserts := make([]string, maxValue+1)
for i := 0; i < maxValue+1; i++ {
inserts[i] = fmt.Sprintf(
"(%d, %d, %d, %d, %d, %d, %d)",
i, i, i, i, i, i, i,
)
}
if _, err := sqlDB.Exec(
fmt.Sprintf(`INSERT INTO t.test VALUES %s`, strings.Join(inserts, ","))); err != nil {
t.Fatal(err)
}

notification := initBackfillNotification()

var wg sync.WaitGroup
wg.Add(1)
go func() {
if _, err := sqlDB.Exec(`
ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (y)`); err != nil {
t.Error(err)
}
wg.Done()
}()

// Wait until the backfill starts.
<-notification

// Update some rows.
rowsUpdated := make(map[int]struct{})
for i := 0; i < 10; i++ {
// Update a row that hasn't been updated yet.
for {
k := rand.Intn(maxValue)
if _, ok := rowsUpdated[k]; !ok {
rowsUpdated[k] = struct{}{}
break
}
}
}
for k := range rowsUpdated {
if _, err := sqlDB.Exec(`
UPDATE t.test SET z = NULL, a = $1, b = NULL, c = NULL, d = $1 WHERE y = $2`, 2*k, k); err != nil {
t.Fatal(err)
}
}

// Delete some rows.
rowsDeleted := make(map[int]struct{})
for i := 0; i < 10; i++ {
// Delete a row that hasn't been updated.
for {
k := rand.Intn(maxValue)
_, updated := rowsUpdated[k]
_, deleted := rowsDeleted[k]
if !updated && !deleted {
rowsDeleted[k] = struct{}{}
break
}
}
}
for k := range rowsDeleted {
if _, err := sqlDB.Exec(`DELETE FROM t.test WHERE x = $1`, k); err != nil {
t.Fatal(err)
}
}

// Insert some rows.
inserts = make([]string, 10)
for i := 0; i < 10; i++ {
val := i + maxValue + 1
inserts[i] = fmt.Sprintf(
"(%d, %d, %d, %d, %d, %d, %d)",
val, val, val, val, val, val, val,
)
}
if _, err := sqlDB.Exec(
fmt.Sprintf(`INSERT INTO t.test VALUES %s`, strings.Join(inserts, ","))); err != nil {
t.Fatal(err)
}

// Wait for the pk change to complete.
wg.Wait()

// Ensure that the count of rows is correct along both indexes.
var count int
row := sqlDB.QueryRow(`SELECT count(*) FROM t.test@primary`)
if err := row.Scan(&count); err != nil {
t.Fatal(err)
}
if count != maxValue+1 {
t.Fatalf("expected %d rows, found %d", maxValue+1, count)
}
row = sqlDB.QueryRow(`SELECT count(x) FROM t.test@test_x_key`)
if err := row.Scan(&count); err != nil {
t.Fatal(err)
}
if count != maxValue+1 {
t.Fatalf("expected %d rows, found %d", maxValue+1, count)
}

// Verify that we cannot find our deleted rows.
for k := range rowsDeleted {
row := sqlDB.QueryRow(`SELECT count(*) FROM t.test WHERE x = $1`, k)
if err := row.Scan(&count); err != nil {
t.Fatal(err)
}
if count != 0 {
t.Fatalf("expected %d rows, found %d", 0, count)
}
}

// Verify that we can find our inserted rows.
for i := 0; i < 10; i++ {
val := i + maxValue + 1
row := sqlDB.QueryRow(`SELECT * FROM t.test WHERE y = $1`, val)
var x, y, z, a, b, c, d int
if err := row.Scan(&x, &y, &z, &a, &b, &c, &d); err != nil {
t.Fatal(err)
}
for i, v := range []int{x, y, z, a, b, c, d} {
if v != val {
t.Fatalf("expected to find %d for column %d, but found %d", val, i, v)
}
}
}

// Verify that our updated rows have indeed been updated.
for k := range rowsUpdated {
row := sqlDB.QueryRow(`SELECT * FROM t.test WHERE y = $1`, k)
var (
x, y, a, d int
z, b, c gosql.NullInt64
)
if err := row.Scan(&x, &y, &z, &a, &b, &c, &d); err != nil {
t.Fatal(err)
}
require.Equal(t, k, x)
require.Equal(t, k, y)
require.Equal(t, 2*k, a)
require.Equal(t, 2*k, d)
for _, v := range []gosql.NullInt64{z, b, c} {
if v.Valid {
t.Fatalf("expected NULL but found %d", v.Int64)
}
}
}
}

// TestPrimaryKeyChangeKVOps tests sequences of k/v operations
Expand Down

0 comments on commit 5c76da3

Please sign in to comment.