Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: deprecate TableDescriptor.GCMutations #75280

Merged
merged 1 commit into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 48 additions & 78 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,16 @@ func writeMutation(
}
}

func writeGCMutation(
t *testing.T,
kvDB *kv.DB,
tableDesc *tabledesc.Mutable,
m descpb.TableDescriptor_GCDescriptorMutation,
) {
tableDesc.GCMutations = append(tableDesc.GCMutations, m)
tableDesc.Version++
if err := catalog.ValidateSelf(tableDesc); err != nil {
t.Fatal(err)
}
if err := kvDB.Put(
context.Background(),
catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.GetID()),
tableDesc.DescriptorProto(),
); err != nil {
t.Fatal(err)
}
}

type mutationOptions struct {
// Set if the desc should have any mutations of any sort.
hasMutation bool
// Set if the mutation being inserted is a GCMutation.
hasGCMutation bool
// Set if the desc should have a job that is dropping it.
hasDropJob bool
}

func (m mutationOptions) string() string {
return fmt.Sprintf("hasMutation=%s_hasGCMutation=%s_hasDropJob=%s",
strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasGCMutation),
return fmt.Sprintf("hasMutation=%s_hasDropJob=%s",
strconv.FormatBool(m.hasMutation),
strconv.FormatBool(m.hasDropJob))
}

Expand Down Expand Up @@ -172,7 +150,7 @@ func TestRegistryGC(t *testing.T) {
writeJob := func(name string, created, finished time.Time, status Status, mutOptions mutationOptions) string {
tableName := constructTableName(name, mutOptions)
if _, err := sqlDB.Exec(fmt.Sprintf(`
CREATE DATABASE IF NOT EXISTS t;
CREATE DATABASE IF NOT EXISTS t;
CREATE TABLE t."%s" (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR,i VARCHAR NOT NULL DEFAULT 'i');
INSERT INTO t."%s" VALUES('a', 'foo');
`, tableName, tableName)); err != nil {
Expand All @@ -187,10 +165,6 @@ INSERT INTO t."%s" VALUES('a', 'foo');
writeColumnMutation(t, kvDB, tableDesc, "i", descpb.DescriptorMutation{State: descpb.
DescriptorMutation_DELETE_AND_WRITE_ONLY, Direction: descpb.DescriptorMutation_DROP})
}
if mutOptions.hasGCMutation {
writeGCMutation(t, kvDB, tableDesc, descpb.TableDescriptor_GCDescriptorMutation{})
}

payload, err := protoutil.Marshal(&jobspb.Payload{
Description: name,
// register a mutation on the table so that jobs that reference
Expand Down Expand Up @@ -222,58 +196,54 @@ INSERT INTO t."%s" VALUES('a', 'foo');

// Test the descriptor when any of the following are set.
// 1. Mutations
// 2. GC Mutations
// 3. A drop job
// 2. A drop job
for _, hasMutation := range []bool{true, false} {
for _, hasGCMutation := range []bool{true, false} {
for _, hasDropJob := range []bool{true, false} {
if !hasMutation && !hasGCMutation && !hasDropJob {
continue
}
mutOptions := mutationOptions{
hasMutation: hasMutation,
hasGCMutation: hasGCMutation,
hasDropJob: hasDropJob,
}
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions)
oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusFailed, mutOptions)
oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusRevertFailed, mutOptions)
oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute),
StatusCanceled, mutOptions)
newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning,
mutOptions)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions)
newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions)
newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions)
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
for _, hasDropJob := range []bool{true, false} {
if !hasMutation && !hasDropJob {
continue
}
mutOptions := mutationOptions{
hasMutation: hasMutation,
hasDropJob: hasDropJob,
}
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions)
oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusFailed, mutOptions)
oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusRevertFailed, mutOptions)
oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute),
StatusCanceled, mutOptions)
newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning,
mutOptions)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions)
newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions)
newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions)
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`,
oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob)
require.NoError(t, err)
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`,
oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob)
require.NoError(t, err)
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1108,15 +1108,20 @@ message TableDescriptor {
(gogoproto.customname) = "JobID", deprecated = true];
}

// Before 22.1:
// The schema elements that have been dropped and whose underlying
// data needs to be gc-ed. These schema elements have already transitioned
// through the drop state machine when they were in the above mutations
// list, and can be safely deleted. The names for these schema elements
// can be reused. This list is separate because mutations can
// lie in this list for a long time (gc deadline) and should not block
// the execution of other schema changes on the table.
//
// Since 22.1 this is field is deprecated and no longer maintained.
// The index GC job still removes mutations it finds in this list.
repeated GCDescriptorMutation gc_mutations = 33 [(gogoproto.nullable) = false,
(gogoproto.customname) = "GCMutations"];
(gogoproto.customname) = "GCMutations",
deprecated = true];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you could get away with outright removing this field (well, replacing it with a reserved 33 to prevent reuse of this tag number in this proto) and then remove updateDescriptorGCMutations outright. Sure, any descriptors might still have a populated GCMutations around, but:

  • it's a handful of bytes at worst,
  • if it's never read, it doesn't matter what's in it,
  • plus it will disappear the next time that the table descriptor is updated.

This shouldn't cause any problems in a mixed-version cluster either. Worst case the GC job gets picked up by a node with the old version, which then edits this slice when it's done. It kind of expects the GC'd index to be in the slice, but doesn't complain if it's not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to go either way here. What would SQL prefer?


optional string create_query = 34 [(gogoproto.nullable) = false];

Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,6 @@ type TableDescriptor interface {
MakePublic() TableDescriptor
// AllMutations returns all of the table descriptor's mutations.
AllMutations() []Mutation
// GetGCMutations returns the table descriptor's GC mutations.
GetGCMutations() []descpb.TableDescriptor_GCDescriptorMutation
// GetMutationJobs returns the table descriptor's mutation jobs.
GetMutationJobs() []descpb.TableDescriptor_MutationJob

Expand Down
7 changes: 1 addition & 6 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,14 +1118,9 @@ INSERT INTO t.kv VALUES ('a', 'b');
}

testutils.SucceedsSoon(t, func() error {
if tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "kv"); len(tableDesc.GetGCMutations()) != 0 {
return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations()))
}
return nil
return tests.CheckKeyCountE(t, kvDB, tableSpan, 2)
})

tests.CheckKeyCount(t, kvDB, tableSpan, 2)

// TODO(erik, vivek): Transactions using old descriptors should fail and
// rollback when the index keys have been removed by ClearRange
// and the consistency issue is resolved. See #31563.
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ var ErrMissingColumns = errors.New("table must contain at least 1 column")
// ErrMissingPrimaryKey indicates a table with no primary key.
var ErrMissingPrimaryKey = errors.New("table must contain a primary key")

// ErrIndexGCMutationsList is returned by FindIndexWithID to signal that the
// index with the given ID does not have a descriptor and is in the garbage
// collected mutations list.
var ErrIndexGCMutationsList = errors.New("index in GC mutations list")

// PostDeserializationTableDescriptorChanges are a set of booleans to indicate
// which types of upgrades or fixes occurred when filling in the descriptor
// after deserialization.
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,6 @@ func (desc *wrapper) FindIndexWithID(id descpb.IndexID) (catalog.Index, error) {
}); idx != nil {
return idx, nil
}
for _, m := range desc.GCMutations {
if m.IndexID == id {
return nil, ErrIndexGCMutationsList
}
}
return nil, errors.Errorf("index-id \"%d\" does not exist", id)
}

Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/gcjob/descriptor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,42 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// updateDescriptorGCMutations removes the GCMutation for the given
// index ID. We no longer populate this field, but we still search it
// to remove existing entries.
func updateDescriptorGCMutations(
ctx context.Context,
execCfg *sql.ExecutorConfig,
tableID descpb.ID,
garbageCollectedIndexID descpb.IndexID,
) error {
log.Infof(ctx, "updating GCMutations for table %d after removing index %d",
tableID, garbageCollectedIndexID)
// Remove the mutation from the table descriptor.
return sql.DescsTxn(ctx, execCfg, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
tbl, err := descsCol.GetMutableTableVersionByID(ctx, tableID, txn)
if err != nil {
return err
}
found := false
for i := 0; i < len(tbl.GCMutations); i++ {
other := tbl.GCMutations[i]
if other.IndexID == garbageCollectedIndexID {
tbl.GCMutations = append(tbl.GCMutations[:i], tbl.GCMutations[i+1:]...)
found = true
break
}
}
b := txn.NewBatch()
if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil {
return err
if found {
log.Infof(ctx, "updating GCMutations for table %d after removing index %d",
tableID, garbageCollectedIndexID)
// Remove the mutation from the table descriptor.
b := txn.NewBatch()
if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil {
return err
}
return txn.Run(ctx, b)
}
return txn.Run(ctx, b)
return nil
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%'
AND operation != 'dist sender send'
----
batch flow coordinator Del /NamespaceTable/30/1/56/57/"kv"/4/1
batch flow coordinator Put /Table/3/1/58/2/1 -> table:<name:"kv" id:58 version:8 modification_time:<> parent_id:56 unexposed_parent_schema_id:57 columns:<name:"k" id:1 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns:<name:"v" id:2 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"kv_pkey" id:1 unique:true version:4 key_column_names:"k" key_column_directions:ASC store_column_names:"v" key_column_ids:1 store_column_ids:2 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION match:SIMPLE > interleave:<> partitioning:<num_columns:0 num_implicit_columns:0 > type:FORWARD created_explicitly:false encoding_type:1 sharded:<is_sharded:false name:"" shard_buckets:0 > disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges:<users:<user_proto:"admin" privileges:2 with_grant_option:0 > users:<user_proto:"public" privileges:0 with_grant_option:0 > users:<user_proto:"root" privileges:2 with_grant_option:0 > owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of:<id:0 time:<> > audit_mode:DISABLED drop_job_id:0 gc_mutations:<index_id:2 drop_time:... job_id:0 > create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false >
batch flow coordinator Put /Table/3/1/58/2/1 -> table:<name:"kv" id:58 version:8 modification_time:<> parent_id:56 unexposed_parent_schema_id:57 columns:<name:"k" id:1 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns:<name:"v" id:2 type:<family: IntFamily width: 64 precision: 0 locale: "" visible_type: 0 oid: 20 time_precision_is_set: false > nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families:<name:"primary" id:0 column_names:"k" column_names:"v" column_ids:1 column_ids:2 default_column_id:2 > next_family_id:1 primary_index:<name:"kv_pkey" id:1 unique:true version:4 key_column_names:"k" key_column_directions:ASC store_column_names:"v" key_column_ids:1 store_column_ids:2 foreign_key:<table:0 index:0 name:"" validity:Validated shared_prefix_len:0 on_delete:NO_ACTION on_update:NO_ACTION match:SIMPLE > interleave:<> partitioning:<num_columns:0 num_implicit_columns:0 > type:FORWARD created_explicitly:false encoding_type:1 sharded:<is_sharded:false name:"" shard_buckets:0 > disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges:<users:<user_proto:"admin" privileges:2 with_grant_option:0 > users:<user_proto:"public" privileges:0 with_grant_option:0 > users:<user_proto:"root" privileges:2 with_grant_option:0 > owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of:<id:0 time:<> > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false >
exec stmt rows affected: 0

# Check that session tracing does not inhibit the fast path for inserts &
Expand Down
10 changes: 0 additions & 10 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,15 +1090,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
}
isRollback = m.IsRollback()
if idx := m.AsIndex(); m.Dropped() && idx != nil {
// how we keep track of dropped index names (for, e.g., zone config
// lookups), even though in the absence of a GC job there's nothing to
// clean them up.
scTable.GCMutations = append(
scTable.GCMutations,
descpb.TableDescriptor_GCDescriptorMutation{
IndexID: idx.GetID(),
})

description := sc.job.Payload().Description
if isRollback {
description = "ROLLBACK of " + description
Expand All @@ -1107,7 +1098,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
if err := sc.createIndexGCJob(ctx, idx.GetID(), txn, description); err != nil {
return err
}

}
if constraint := m.AsConstraint(); constraint != nil && constraint.Adding() {
if constraint.IsForeignKey() && constraint.ForeignKey().Validity == descpb.ConstraintValidity_Unvalidated {
Expand Down
Loading