Skip to content

Commit

Permalink
upgrades: add weaker column schema exists funcs for use with migrations
Browse files Browse the repository at this point in the history
This patch adds two schema exists functions for use with migrations
that involve multiple schema changes on the same column(s) in order
to preserve the idempotence of the migration(s). They are weaker
in the sense that they do not check that the stored and final
expected descriptor match.

Release note: None
  • Loading branch information
andyyang890 committed Nov 14, 2022
1 parent a00fbe9 commit 61cc346
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 59 deletions.
61 changes: 61 additions & 0 deletions pkg/upgrade/upgrades/schema_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,67 @@ func hasColumn(storedTable, expectedTable catalog.TableDescriptor, colName strin
return true, nil
}

// columnExists returns true if storedTable contains a column with the given
// colName. Unlike hasColumn, it does not check that the column descriptor in
// storedTable and expectedTable match.
//
// This weaker check should be used when a migration/multiple migrations
// alter(s) the same column multiple times in order to ensure the migration(s)
// remain(s) idempotent. Consider the following series of (sub)migrations:
// 1. column C is first added to a table as nullable (NULL)
// 2. column C is backfilled with non-NULL values
// 3. column C is altered to be not nullable (NOT NULL)
//
// When we are deciding whether (sub)migration 1 should run, we can either
// (a) compare it to the expected descriptor after the column has been added
// but before it has been altered to be NOT NULL or (b) compare it to the
// expected descriptor after the column has been altered to be NOT NULL.
//
// If we choose to do (a) and for some reason after (sub)migration 3 is
// completed, we need to restart and run (sub)migration 1 again, hasColumn
// would now return an error because the column exists and is NOT NULL but the
// expected descriptor we have has it as NULL.
//
// If we choose to do (b) and for some reason after (sub)migration 1 is
// completed but before (sub)migration 3 runs, we restart and try
// (sub)migration 1 again, hasColumn would also now return an error because the
// column exists and is NULL when the expected descriptor has it as NOT NULL.
//
// In either case, the cluster would enter an unrecoverable state where it will
// repeatedly attempt to perform (sub)migration 1 and fail, preventing the
// migration and any further migrations from running.
func columnExists(
storedTable, expectedTable catalog.TableDescriptor, colName string,
) (bool, error) {
_, err := storedTable.FindColumnWithName(tree.Name(colName))
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
return false, nil
}
return false, err
}
return true, nil
}

// columnExistsAndIsNotNull returns true if storedTable contains a non-nullable
// (NOT NULL) column with the given colName. Like columnExists, it does not
// check that the column descriptor in storedTable and expectedTable match and
// it should be used when a migration/multiple migrations alter(s) the same
// column multiple times. See the comment for columnExists for the reasoning
// behind this.
func columnExistsAndIsNotNull(
storedTable, expectedTable catalog.TableDescriptor, colName string,
) (bool, error) {
storedCol, err := storedTable.FindColumnWithName(tree.Name(colName))
if err != nil {
if strings.Contains(err.Error(), "does not exist") {
return false, nil
}
return false, err
}
return !storedCol.IsNullable(), nil
}

// hasIndex returns true if storedTable already has the given index, comparing
// with expectedTable.
// storedTable descriptor must be read from system storage as compared to reading
Expand Down
192 changes: 133 additions & 59 deletions pkg/upgrade/upgrades/schema_changes_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ import (
"github.com/stretchr/testify/require"
)

type schemaChangeTestCase struct {
// Test identifier.
name string
// Job status when the job is intercepted while transitioning to the intercepted status.
query string
// Whether the schema-change job should wait for the migration to restart
// after failure before proceeding.
waitForMigrationRestart bool
// Cancel the intercepted schema-change to inject a failure during migration.
cancelSchemaJob bool
// Expected number of schema-changes that are skipped during migration.
expectedSkipped int
}

// TestMigrationWithFailures tests modification of a table during
// migration with different failures. It tests the system behavior with failure
// combinations of the migration job and schema-change jobs at different stages
Expand All @@ -54,49 +68,6 @@ import (
// exponential backoff to the system.jobs table, but was retrofitted to prevent
// regressions.
func TestMigrationWithFailures(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "very slow")

// We're going to be migrating from startCV to endCV.
startCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2041}}
endCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2042}}

// The tests follows the following procedure.
//
// Inject the old table descriptor and ensure that the system is using the
// deprecated jobs-table.
//
// Start migration, which initiates two schema-change jobs one by one. Test
// the system for each schema-change job separately. Later on, we inject
// failure in this migration, causing it to fail.
//
// Depending on the test setting, intercept the target schema-change job,
// preventing the job from progressing. We may cancel this schema-change or
// let it succeed to test different scenarios.
//
// Cancel the migration, causing the migration to revert and fail.
//
// Wait for the canceled migration-job to finish, expecting its failure. The
// schema-change job is still not progressing to control what the restarted
// migration will observe.
//
// Restart the migration, expecting it to succeed. Depending on the test setting,
// the intercepted schema-change job may wail for the migration job to resume.
// If it does, the migration job is expected to observe the ongoing schema-change.
// The ongoing schema-change is canceled or not, depending on the test case.
// In either case, we expect the correct number of mutations to be skipped
// during the migration.
//
// If we canceled the schema-job, expect it to rerun
// as part of the migration. Otherwise, expect the schema-change to be ignored
// during the migration.
//
// Finally, we validate that the schema changes are in effect by reading the new
// columns and the index, and by running a job that is failed and retried to
// practice exponential-backoff machinery.

const createTableBefore = `
CREATE TABLE test.test_table (
id INT8 DEFAULT unique_rowid() PRIMARY KEY,
Expand Down Expand Up @@ -142,19 +113,7 @@ CREATE TABLE test.test_table (
);
`

for _, test := range []struct {
// Test identifier.
name string
// Job status when the job is intercepted while transitioning to the intercepted status.
query string
// Whether the schema-change job should wait for the migration to restart
// after failure before proceeding.
waitForMigrationRestart bool
// Cancel the intercepted schema-change to inject a failure during migration.
cancelSchemaJob bool
// Expected number of schema-changes that are skipped during migration.
expectedSkipped int
}{
testCases := []schemaChangeTestCase{
{
name: "adding columns",
query: upgrades.TestingAddColsQuery,
Expand Down Expand Up @@ -204,7 +163,123 @@ CREATE TABLE test.test_table (
cancelSchemaJob: false, // To fail adding index and skip adding column.
expectedSkipped: 2, // Both columns and index must not be added again.
},
} {
}

testMigrationWithFailures(t, createTableBefore, createTableAfter, upgrades.MakeFakeMigrationForTestMigrationWithFailures, testCases)
}

// TestMigrationWithFailuresMultipleAltersOnSameColumn tests a migration that
// alters a column in a table multiple times with failures at different stages
// of the migration.
func TestMigrationWithFailuresMultipleAltersOnSameColumn(t *testing.T) {
const createTableBefore = `
CREATE TABLE test.test_table (
username STRING NOT NULL
);
`

const createTableAfter = `
CREATE TABLE test.test_table (
username STRING NOT NULL,
user_id OID NOT NULL
);
`

testCases := []schemaChangeTestCase{
{
name: "add column",
query: upgrades.TestingAddNewColStmt,
waitForMigrationRestart: false,
cancelSchemaJob: false,
expectedSkipped: 0,
},
{
name: "alter column",
query: upgrades.TestingAlterNewColStmt,
waitForMigrationRestart: false,
cancelSchemaJob: false,
expectedSkipped: 0,
},
{
name: "skip none",
query: upgrades.TestingAddNewColStmt,
waitForMigrationRestart: true,
cancelSchemaJob: true,
expectedSkipped: 0,
},
{
name: "skip adding column",
query: upgrades.TestingAlterNewColStmt,
waitForMigrationRestart: true,
cancelSchemaJob: true,
expectedSkipped: 1,
},
{
name: "skip adding column and altering column",
query: upgrades.TestingAlterNewColStmt,
waitForMigrationRestart: true,
cancelSchemaJob: false,
expectedSkipped: 2,
},
}

testMigrationWithFailures(t, createTableBefore, createTableAfter, upgrades.MakeFakeMigrationForTestMigrationWithFailuresMultipleAltersOnSameColumn, testCases)
}

// testMigrationWithFailures tests a migration that alters the schema of a
// table with failures injected at multiple points within the migration.
// The table should be named test.test_table.
func testMigrationWithFailures(
t *testing.T,
createTableBefore string,
createTableAfter string,
testMigrationFunc upgrades.SchemaChangeTestMigrationFunc,
testCases []schemaChangeTestCase,
) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "very slow")

// We're going to be migrating from startCV to endCV.
startCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2041}}
endCV := clusterversion.ClusterVersion{Version: roachpb.Version{Major: 2042}}

// The tests follows the following procedure.
//
// Inject the old table descriptor and ensure that the system is using the
// deprecated jobs-table.
//
// Start migration, which initiates two schema-change jobs one by one. Test
// the system for each schema-change job separately. Later on, we inject
// failure in this migration, causing it to fail.
//
// Depending on the test setting, intercept the target schema-change job,
// preventing the job from progressing. We may cancel this schema-change or
// let it succeed to test different scenarios.
//
// Cancel the migration, causing the migration to revert and fail.
//
// Wait for the canceled migration-job to finish, expecting its failure. The
// schema-change job is still not progressing to control what the restarted
// migration will observe.
//
// Restart the migration, expecting it to succeed. Depending on the test setting,
// the intercepted schema-change job may wail for the migration job to resume.
// If it does, the migration job is expected to observe the ongoing schema-change.
// The ongoing schema-change is canceled or not, depending on the test case.
// In either case, we expect the correct number of mutations to be skipped
// during the migration.
//
// If we canceled the schema-job, expect it to rerun
// as part of the migration. Otherwise, expect the schema-change to be ignored
// during the migration.
//
// Finally, we validate that the schema changes are in effect by reading the new
// columns and the index, and by running a job that is failed and retried to
// practice exponential-backoff machinery.

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scope := log.Scope(t)
defer scope.Close(t)
Expand Down Expand Up @@ -253,8 +328,7 @@ CREATE TABLE test.test_table (
))
jobsKnobs := jobs.NewTestingKnobsWithShortIntervals()
jobsKnobs.BeforeUpdate = beforeUpdate
migrationFunc, expectedDescriptor := upgrades.
MakeFakeMigrationForTestMigrationWithFailures()
migrationFunc, expectedDescriptor := testMigrationFunc()
clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Expand Down
59 changes: 59 additions & 0 deletions pkg/upgrade/upgrades/schema_changes_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/upgrade"
)

type SchemaChangeTestMigrationFunc func() (m upgrade.TenantUpgradeFunc, expectedTableDescriptor *atomic.Value)

const (
// TestingAddColsQuery is used by TestMigrationWithFailures.
TestingAddColsQuery = `
Expand Down Expand Up @@ -76,3 +78,60 @@ func MakeFakeMigrationForTestMigrationWithFailures() (
return nil
}, expectedTableDescriptor
}

var _ SchemaChangeTestMigrationFunc = MakeFakeMigrationForTestMigrationWithFailures

const (
// TestingAddNewColStmt is used by TestMigrationWithFailuresMultipleAltersOnSameColumn.
TestingAddNewColStmt = `
ALTER TABLE test.test_table
ADD COLUMN user_id OID
`

// TestingAlterNewColStmt is used by TestMigrationWithFailuresMultipleAltersOnSameColumn.
TestingAlterNewColStmt = `
ALTER TABLE test.test_table
ALTER COLUMN user_id SET NOT NULL
`
)

// MakeFakeMigrationForTestMigrationWithFailuresMultipleAltersOnSameColumn makes the
// migration function used in TestMigrationWithFailuresMultipleAltersOnSameColumn.
func MakeFakeMigrationForTestMigrationWithFailuresMultipleAltersOnSameColumn() (
m upgrade.TenantUpgradeFunc,
expectedTableDescriptor *atomic.Value,
) {
expectedTableDescriptor = &atomic.Value{}
return func(
ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job,
) error {
row, err := d.InternalExecutor.QueryRow(ctx, "look-up-id", nil, /* txn */
`select id from system.namespace where name = $1`, "test_table")
if err != nil {
return err
}
tableID := descpb.ID(tree.MustBeDInt(row[0]))
for _, op := range []operation{
{
name: "add-user-id-column",
schemaList: []string{"user_id"},
query: TestingAddNewColStmt,
schemaExistsFn: columnExists,
},
{
name: "alter-user-id-column",
schemaList: []string{"user_id"},
query: TestingAlterNewColStmt,
schemaExistsFn: columnExistsAndIsNotNull,
},
} {
expected := expectedTableDescriptor.Load().(catalog.TableDescriptor)
if err := migrateTable(ctx, cs, d, op, tableID, expected); err != nil {
return err
}
}
return nil
}, expectedTableDescriptor
}

var _ SchemaChangeTestMigrationFunc = MakeFakeMigrationForTestMigrationWithFailuresMultipleAltersOnSameColumn

0 comments on commit 61cc346

Please sign in to comment.