Skip to content

Commit

Permalink
sql: asynchronously drop non-interleaved indexes
Browse files Browse the repository at this point in the history
This change drops non-interleaved indexes asynchronously by performing
the deletion of data using an asynchronous schema changer. This is in
preparation to eventually remove index data using `ClearRange` after the
GC TTL period has passed. The initial schema changer runs through the
state machine but does not perform the deletion of index data. Instead
the mutation is moved to a separate list and has a timestamp attached.
The created asynchronous schema changer uses the timestamp and index's
configured GC TTL value to determine when it should begin execution and
actually truncate the index.

When the async schema changer deletes the index data two things occur:
the job is marked as succeeded and the index zone config is removed.

The job can immediately be marked as succeeded because currently a
separate job is created for each index that is dropped.

Interleaved indexes are unaffected and have their data deleted
immediately.

Related to #20696

Release note: none
  • Loading branch information
Erik Trinh committed Oct 8, 2018
1 parent 867d9fd commit 98a1328
Show file tree
Hide file tree
Showing 15 changed files with 988 additions and 355 deletions.
83 changes: 71 additions & 12 deletions pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,46 @@ import (
"context"
"testing"

"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

func subzoneExists(cfg *config.ZoneConfig, index uint32, partition string) bool {
for _, s := range cfg.Subzones {
if s.IndexID == index && s.PartitionName == partition {
return true
}
}
return false
}

func TestDropIndexWithZoneConfigCCL(t *testing.T) {
defer leaktest.AfterTest(t)()

const numRows = 100

asyncNotification := make(chan struct{})

params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
AsyncExecNotification: func() error {
<-asyncNotification
return nil
},
AsyncExecQuickly: true,
},
}
s, sqlDBRaw, kvDB := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw)
defer s.Stopper().Stop(context.Background())
Expand All @@ -47,30 +74,62 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {

// Set zone configs on the primary index, secondary index, and one partition
// of the secondary index.
ttlYaml := "gc: {ttlseconds: 1}"
sqlutils.SetZoneConfig(t, sqlDB, "INDEX t.kv@primary", "")
sqlutils.SetZoneConfig(t, sqlDB, "INDEX t.kv@i", "")
sqlutils.SetZoneConfig(t, sqlDB, "PARTITION p2 OF TABLE t.kv", "")
for _, target := range []string{"t.kv@primary", "t.kv@i", "t.kv.p2"} {
if exists := sqlutils.ZoneConfigExists(t, sqlDB, target); !exists {
t.Fatalf(`zone config for %s does not exist`, target)
}
}
sqlutils.SetZoneConfig(t, sqlDB, "INDEX t.kv@i", ttlYaml)
sqlutils.SetZoneConfig(t, sqlDB, "PARTITION p2 OF TABLE t.kv", ttlYaml)

// Drop the index and verify that the zone config for the secondary index and
// its partition are removed but the zone config for the primary index
// remains.
sqlDB.Exec(t, `DROP INDEX t.kv@i`)
tests.CheckKeyCount(t, kvDB, indexSpan, 0)
// All zone configs should still exist.
var buf []byte
cfg := &config.ZoneConfig{}
sqlDB.QueryRow(t, "SELECT config FROM system.zones WHERE id = $1", tableDesc.ID).Scan(&buf)
if err := protoutil.Unmarshal(buf, cfg); err != nil {
t.Fatal(err)
}

subzones := []struct {
index uint32
partition string
}{
{1, ""},
{3, ""},
{3, "p2"},
}
for _, target := range subzones {
if exists := subzoneExists(cfg, target.index, target.partition); !exists {
t.Fatalf(`zone config for %v does not exist`, target)
}
}
tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "kv")
if _, _, err := tableDesc.FindIndexByName("i"); err == nil {
t.Fatalf("table descriptor still contains index after index is dropped")
}
if exists := sqlutils.ZoneConfigExists(t, sqlDB, "t.kv@primary"); !exists {
close(asyncNotification)

// Wait for index drop to complete so zone configs are updated.
testutils.SucceedsSoon(t, func() error {
if kvs, err := kvDB.Scan(context.TODO(), indexSpan.Key, indexSpan.EndKey, 0); err != nil {
return err
} else if l := 0; len(kvs) != l {
return errors.Errorf("expected %d key value pairs, but got %d", l, len(kvs))
}
return nil
})

sqlDB.QueryRow(t, "SELECT config FROM system.zones WHERE id = $1", tableDesc.ID).Scan(&buf)
if err := protoutil.Unmarshal(buf, cfg); err != nil {
t.Fatal(err)
}
if exists := subzoneExists(cfg, 1, ""); !exists {
t.Fatal("zone config for primary index removed after dropping secondary index")
}
for _, target := range []string{"t.kv@i", "t.kv.p2"} {
if exists := sqlutils.ZoneConfigExists(t, sqlDB, target); exists {
t.Fatalf(`zone config for %s still exists`, target)
for _, target := range subzones[1:] {
if exists := subzoneExists(cfg, target.index, target.partition); exists {
t.Fatalf(`zone config for %v still exists`, target)
}
}
}
36 changes: 16 additions & 20 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ func (sc *SchemaChanger) runBackfill(
// mutations. Collect the elements that are part of the mutation.
var droppedIndexDescs []sqlbase.IndexDescriptor
var addedIndexDescs []sqlbase.IndexDescriptor
// Indexes within the Mutations slice for checkpointing.
mutationSentinel := -1
var droppedIndexMutationIdx int

var tableDesc *sqlbase.TableDescriptor
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
Expand All @@ -136,7 +133,7 @@ func (sc *SchemaChanger) runBackfill(
tableDesc.Name, tableDesc.Version, sc.mutationID)

needColumnBackfill := false
for i, m := range tableDesc.Mutations {
for _, m := range tableDesc.Mutations {
if m.MutationID != sc.mutationID {
break
}
Expand All @@ -158,9 +155,8 @@ func (sc *SchemaChanger) runBackfill(
case *sqlbase.DescriptorMutation_Column:
needColumnBackfill = true
case *sqlbase.DescriptorMutation_Index:
droppedIndexDescs = append(droppedIndexDescs, *t.Index)
if droppedIndexMutationIdx == mutationSentinel {
droppedIndexMutationIdx = i
if !sc.canClearRangeForDrop(t.Index) {
droppedIndexDescs = append(droppedIndexDescs, *t.Index)
}
default:
return errors.Errorf("unsupported mutation: %+v", m)
Expand All @@ -170,18 +166,9 @@ func (sc *SchemaChanger) runBackfill(

// First drop indexes, then add/drop columns, and only then add indexes.

// Drop indexes.
// Drop indexes not to be removed by `ClearRange`.
if len(droppedIndexDescs) > 0 {
if err := sc.truncateIndexes(
ctx, lease, version, droppedIndexDescs, droppedIndexMutationIdx,
); err != nil {
return err
}

// Remove index zone configs.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return removeIndexZoneConfigs(ctx, txn, sc.execCfg, tableDesc.ID, droppedIndexDescs)
}); err != nil {
if err := sc.truncateIndexes(ctx, lease, version, droppedIndexDescs); err != nil {
return err
}
}
Expand Down Expand Up @@ -222,7 +209,6 @@ func (sc *SchemaChanger) truncateIndexes(
lease *sqlbase.TableDescriptor_SchemaChangeLease,
version sqlbase.DescriptorVersion,
dropped []sqlbase.IndexDescriptor,
mutationIdx int,
) error {
chunkSize := sc.getChunkSize(indexTruncateChunkSize)
if sc.testingKnobs.BackfillChunkSize > 0 {
Expand Down Expand Up @@ -270,14 +256,24 @@ func (sc *SchemaChanger) truncateIndexes(
return err
}
resume, err = td.deleteIndex(
ctx, &desc, resumeAt, chunkSize, noAutoCommit, false, /* traceKV */
ctx,
&desc,
resumeAt,
chunkSize,
noAutoCommit,
false, /* traceKV */
)
done = resume.Key == nil
return err
}); err != nil {
return err
}
}
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
return removeIndexZoneConfigs(ctx, txn, sc.execCfg, sc.tableID, dropped)
}); err != nil {
return err
}
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,9 @@ CREATE TABLE crdb_internal.zones (
for _, s := range subzones {
index, err := table.FindIndexByID(sqlbase.IndexID(s.IndexID))
if err != nil {
if err == sqlbase.ErrIndexGCMutationsList {
continue
}
return err
}
if zoneSpecifier != nil {
Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ func (p *planner) initiateDropTable(
}

// Mark all jobs scheduled for schema changes as successful.
jobIDs := make(map[int64]struct{})
var id sqlbase.MutationID
for _, m := range tableDesc.Mutations {
if id != m.MutationID {
Expand All @@ -369,13 +370,20 @@ func (p *planner) initiateDropTable(
if err != nil {
return err
}
job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
return err
}
if err := job.WithTxn(p.txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err, "failed to mark job %d as as successful", jobID)
}
jobIDs[jobID] = struct{}{}
}
}
for _, gcm := range tableDesc.GCMutations {
jobIDs[gcm.JobID] = struct{}{}
}
for jobID := range jobIDs {
job, err := p.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobID, p.txn)
if err != nil {
return err
}

if err := job.WithTxn(p.txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err, "failed to mark job %d as as successful", jobID)
}
}

Expand Down
73 changes: 62 additions & 11 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,17 @@ func addImmediateGCZoneConfig(sqlDB *gosql.DB, id sqlbase.ID) (config.ZoneConfig
if err != nil {
return cfg, err
}
_, err = sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, id, buf)
_, err = sqlDB.Exec(`UPSERT INTO system.zones VALUES ($1, $2)`, id, buf)
return cfg, err
}

func addDefaultZoneConfig(sqlDB *gosql.DB, id sqlbase.ID) (config.ZoneConfig, error) {
cfg := config.DefaultZoneConfig()
buf, err := protoutil.Marshal(&cfg)
if err != nil {
return cfg, err
}
_, err = sqlDB.Exec(`UPSERT INTO system.zones VALUES ($1, $2)`, id, buf)
return cfg, err
}

Expand Down Expand Up @@ -434,33 +444,77 @@ func TestDropIndex(t *testing.T) {
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
BackfillChunkSize: chunkSize,
AsyncExecQuickly: true,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

numRows := 2*chunkSize + 1
if err := tests.CreateKVTable(sqlDB, "kv", numRows); err != nil {
t.Fatal(err)
}
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")

tests.CheckKeyCount(t, kvDB, tableDesc.TableSpan(), 3*numRows)
idx, _, err := tableDesc.FindIndexByName("foo")
if err != nil {
t.Fatal(err)
}
indexSpan := tableDesc.IndexSpan(idx.ID)

tests.CheckKeyCount(t, kvDB, indexSpan, numRows)
if _, err := sqlDB.Exec(`DROP INDEX t.kv@foo`); err != nil {
t.Fatal(err)
}
tests.CheckKeyCount(t, kvDB, indexSpan, 0)

tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "kv")
if _, _, err := tableDesc.FindIndexByName("foo"); err == nil {
t.Fatalf("table descriptor still contains index after index is dropped")
}
// Index data hasn't been deleted.
tests.CheckKeyCount(t, kvDB, indexSpan, numRows)
tests.CheckKeyCount(t, kvDB, tableDesc.TableSpan(), 3*numRows)

sqlRun := sqlutils.MakeSQLRunner(sqlDB)
if err := jobutils.VerifyRunningSystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.RunningStatusWaitingGC, jobs.Record{
Username: security.RootUser,
Description: `DROP INDEX t.public.kv@foo`,
DescriptorIDs: sqlbase.IDs{
tableDesc.ID,
},
}); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(`CREATE INDEX foo on t.kv (v);`); err != nil {
t.Fatal(err)
}

tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "kv")
newIdx, _, err := tableDesc.FindIndexByName("foo")
if err != nil {
t.Fatal(err)
}
newIdxSpan := tableDesc.IndexSpan(newIdx.ID)
tests.CheckKeyCount(t, kvDB, newIdxSpan, numRows)
tests.CheckKeyCount(t, kvDB, tableDesc.TableSpan(), 4*numRows)

// Add a zone config for the table.
if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
return jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{
Username: security.RootUser,
Description: `DROP INDEX t.public.kv@foo`,
DescriptorIDs: sqlbase.IDs{
tableDesc.ID,
},
})
})

tests.CheckKeyCount(t, kvDB, newIdxSpan, numRows)
tests.CheckKeyCount(t, kvDB, indexSpan, 0)
tests.CheckKeyCount(t, kvDB, tableDesc.TableSpan(), 3*numRows)
}

func TestDropIndexWithZoneConfigOSS(t *testing.T) {
Expand Down Expand Up @@ -531,7 +585,7 @@ func TestDropIndexWithZoneConfigOSS(t *testing.T) {
if exists := sqlutils.ZoneConfigExists(t, sqlDB, "t.kv@foo"); exists {
t.Fatal("zone config for index still exists after dropping index")
}
tests.CheckKeyCount(t, kvDB, indexSpan, 0)
tests.CheckKeyCount(t, kvDB, indexSpan, numRows)
tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "kv")
if _, _, err := tableDesc.FindIndexByName("foo"); err == nil {
t.Fatalf("table descriptor still contains index after index is dropped")
Expand All @@ -544,6 +598,7 @@ func TestDropIndexInterleaved(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
AsyncExecQuickly: true,
BackfillChunkSize: chunkSize,
},
}
Expand Down Expand Up @@ -597,14 +652,10 @@ func TestDropTable(t *testing.T) {
}

// Add a zone config for the table.
cfg := config.DefaultZoneConfig()
buf, err := protoutil.Marshal(&cfg)
cfg, err := addDefaultZoneConfig(sqlDB, tableDesc.ID)
if err != nil {
t.Fatal(err)
}
if _, err := sqlDB.Exec(`INSERT INTO system.zones VALUES ($1, $2)`, tableDesc.ID, buf); err != nil {
t.Fatal(err)
}

if err := zoneExists(sqlDB, &cfg, tableDesc.ID); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 98a1328

Please sign in to comment.