Skip to content

Commit

Permalink
sql: start index drop job for primary key changes immediately
Browse files Browse the repository at this point in the history
Previously, the job to drop old indexes after a primary key change was
created at the end of the schema change and left for the job registry to
adopt at some later time. This PR switches the job creation over to
`CreateStartableJobWithTxn`, which allows for starting the job
immediately after the transaction which creates the job is committed.

This eliminates one source of waiting time before other schema changes,
which are queued behind the mutation to drop the indexes, can run. Note,
however, that successive schema changes don't start running immediately
after the mutations ahead of them in the queue are all cleared; if
they've ever hit a retry error due to not being first in line, then they
will have to wait to be re-adopted by the job registry. This is why the
existing primary key change tests still need to have the adopt interval
set to a low value in order to finish quickly.

Release note (performance improvement): The cleanup job which runs after
a primary key change to remove old indexes, which blocks other schema
changes from running, now starts immediately after the primary key swap
is complete. This reduces the amount of waiting time before subsequent
schema changes can run.
  • Loading branch information
lucy-zhang committed Apr 18, 2020
1 parent 67f44db commit fc96d9d
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 29 deletions.
14 changes: 0 additions & 14 deletions pkg/sql/logictest/testdata/logic_test/alter_primary_key
Original file line number Diff line number Diff line change
Expand Up @@ -735,20 +735,6 @@ ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (x)
statement ok
ROLLBACK

# Ensure that we cannot cancel the index cleanup jobs spawned by
# a primary key change.
statement ok
DROP TABLE IF EXISTS t;
CREATE TABLE t (x INT PRIMARY KEY, y INT NOT NULL);
ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (y)

statement error pq: job [0-9]*: not cancelable
CANCEL JOB (
SELECT job_id FROM [SHOW JOBS] WHERE
description = 'CLEANUP JOB for ''ALTER TABLE test.public.t ALTER PRIMARY KEY USING COLUMNS (y)''' AND
status = 'running'
)

# Ensure that starting a primary key change that does not
# enqueue any mutations doesn't start a job.
# TODO (rohany): This test might become obselete when #44923 is fixed.
Expand Down
23 changes: 17 additions & 6 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,10 +778,12 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}

var indexGCJobs []*jobs.StartableJob
var pkChangeIndexDropJob *jobs.StartableJob
update := func(txn *kv.Txn, descs map[sqlbase.ID]*sqlbase.MutableTableDescriptor) error {
// Reset vars here because update function can be called multiple times in a retry.
isRollback = false
indexGCJobs = nil
pkChangeIndexDropJob = nil

i := 0
scDesc, ok := descs[sc.tableID]
Expand Down Expand Up @@ -910,15 +912,12 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
Progress: jobspb.SchemaChangeProgress{},
NonCancelable: true,
}
// TODO (lucy): We should be able to create a StartableJob and start
// it after calling PublishMultiple() to finalize the mutations,
// as with the indexGCJob. That will allow us to start the job
// immediately without having to wait for the registry to adopt it.
job, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn)
job, err := sc.jobRegistry.CreateStartableJobWithTxn(ctx, jobRecord, txn, nil /* resultsCh */)
if err != nil {
return err
}
log.VEventf(ctx, 2, "created job %d to drop previous indexes", *job.ID())
pkChangeIndexDropJob = job
scDesc.MutationJobs = append(scDesc.MutationJobs, sqlbase.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: *job.ID(),
Expand Down Expand Up @@ -969,7 +968,12 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
if err != nil {
for _, job := range indexGCJobs {
if rollbackErr := job.CleanupOnRollback(ctx); rollbackErr != nil {
log.Warningf(ctx, "failed to cleanup job: %v", rollbackErr)
log.Warningf(ctx, "failed to clean up job: %v", rollbackErr)
}
}
if pkChangeIndexDropJob != nil {
if rollbackErr := pkChangeIndexDropJob.CleanupOnRollback(ctx); rollbackErr != nil {
log.Warningf(ctx, "failed to clean up job: %v", rollbackErr)
}
}
return nil, err
Expand All @@ -980,6 +984,13 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}
log.VEventf(ctx, 2, "started GC job %d", *job.ID())
}
if pkChangeIndexDropJob != nil {
if _, err := pkChangeIndexDropJob.Start(ctx); err != nil {
log.Warningf(ctx, "starting primary key change index cleanup job %d failed with error: %v",
*pkChangeIndexDropJob.ID(), err)
}
log.VEventf(ctx, 2, "started primary key change index cleanup job %d", *pkChangeIndexDropJob.ID())
}
return descs[sc.tableID], nil
}

Expand Down
81 changes: 72 additions & 9 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,12 @@ func runSchemaChangeWithOperations(
wg.Wait() // for schema change to complete.

// Verify the number of keys left behind in the table to validate schema
// change operations.
if err := checkTableKeyCount(ctx, kvDB, keyMultiple, maxValue+numInserts); err != nil {
t.Fatal(err)
}
// change operations. This is wrapped in SucceedsSoon to handle cases where
// dropped indexes are expected to be GC'ed immediately after the schema
// change completes.
testutils.SucceedsSoon(t, func() error {
return checkTableKeyCount(ctx, kvDB, keyMultiple, maxValue+numInserts)
})
if err := sqlutils.RunScrub(sqlDB, "t", "test"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2494,6 +2496,13 @@ CREATE TABLE t.test (k INT NOT NULL, v INT);
`); err != nil {
t.Fatal(err)
}
// GC the old indexes to be dropped after the PK change immediately.
defer disableGCTTLStrictEnforcement(t, sqlDB)()
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")
if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil {
t.Fatal(err)
}

// Bulk insert.
if err := bulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
Expand All @@ -2505,7 +2514,7 @@ CREATE TABLE t.test (k INT NOT NULL, v INT);
kvDB,
"ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (k)",
maxValue,
2,
1,
initBackfillNotification(),
// We don't let runSchemaChangeWithOperations use UPSERT statements, because
// way in which runSchemaChangeWithOperations uses them assumes that k is already
Expand Down Expand Up @@ -2992,14 +3001,68 @@ CREATE TABLE t.test (k INT NOT NULL, v INT);
})
}

// TestPrimaryKeyDropIndexNotCancelable tests that the job to drop indexes after
// a primary key change is not cancelable.
func TestPrimaryKeyDropIndexNotCancelable(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
var db *gosql.DB
shouldAttemptCancel := true
hasAttemptedCancel := make(chan struct{})
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
GCJob: &sql.GCJobTestingKnobs{
RunBeforeResume: func(jobID int64) error {
if !shouldAttemptCancel {
return nil
}
_, err := db.Exec(`CANCEL JOB ($1)`, jobID)
require.Regexp(t, "not cancelable", err)
shouldAttemptCancel = false
close(hasAttemptedCancel)
return nil
},
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
db = sqlDB
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT NOT NULL, v INT);
`)
require.NoError(t, err)

_, err = sqlDB.Exec(`ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (k)`)
require.NoError(t, err)

// Wait until the testing knob has notified that canceling the job has been
// attempted before continuing.
<-hasAttemptedCancel

sqlRun := sqlutils.MakeSQLRunner(sqlDB)
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")
testutils.SucceedsSoon(t, func() error {
return jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Description: "CLEANUP JOB for 'ALTER TABLE t.public.test ALTER PRIMARY KEY USING COLUMNS (k)'",
Username: security.RootUser,
DescriptorIDs: sqlbase.IDs{tableDesc.ID},
})
})
}

// TestMultiplePrimaryKeyChanges ensures that we can run many primary key
// changes back to back. We cannot run this in a logic test because we need the
// job that drops old indexes to finish so that the following primary key change
// occurs quickly.
// changes back to back. We cannot run this in a logic test because we need to
// set a low job registry adopt interval, so that each successive schema change
// can run immediately without waiting too long for a retry due to being second
// in line after the mutation to drop indexes for the previous primary key
// change.
func TestMultiplePrimaryKeyChanges(t *testing.T) {
defer leaktest.AfterTest(t)()

// Adopt the job to drop old indexes quickly.
// Decrease the adopt loop interval so that retries happen quickly.
defer setTestJobsAdoptInterval()()

ctx := context.Background()
Expand Down

0 comments on commit fc96d9d

Please sign in to comment.