Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Use CPut when writing to system.descriptor table #88844

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pkg/sql/catalog/dbdesc/database_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type immutable struct {
// changed represents whether or not the descriptor was changed
// after RunPostDeserializationChanges.
changes catalog.PostDeserializationChanges

// This is the raw bytes (tag + data) of the database descriptor in storage.
rawBytesInStorage []byte
}

// Mutable wraps a database descriptor and provides methods
Expand Down Expand Up @@ -142,15 +145,19 @@ func (desc *immutable) ByteSize() int64 {

// NewBuilder implements the catalog.Descriptor interface.
func (desc *immutable) NewBuilder() catalog.DescriptorBuilder {
return newBuilder(desc.DatabaseDesc(), hlc.Timestamp{}, desc.isUncommittedVersion, desc.changes)
b := newBuilder(desc.DatabaseDesc(), hlc.Timestamp{}, desc.isUncommittedVersion, desc.changes)
b.SetRawBytesInStorage(desc.GetRawBytesInStorage())
return b
}

// NewBuilder implements the catalog.Descriptor interface.
//
// It overrides the wrapper's implementation to deal with the fact that
// mutable has overridden the definition of IsUncommittedVersion.
func (desc *Mutable) NewBuilder() catalog.DescriptorBuilder {
return newBuilder(desc.DatabaseDesc(), hlc.Timestamp{}, desc.IsUncommittedVersion(), desc.changes)
b := newBuilder(desc.DatabaseDesc(), hlc.Timestamp{}, desc.IsUncommittedVersion(), desc.changes)
b.SetRawBytesInStorage(desc.GetRawBytesInStorage())
return b
}

// IsMultiRegion implements the DatabaseDescriptor interface.
Expand Down Expand Up @@ -521,6 +528,16 @@ func (desc *immutable) SkipNamespace() bool {
return false
}

// GetRawBytesInStorage implements the catalog.Descriptor interface.
func (desc *immutable) GetRawBytesInStorage() []byte {
return desc.rawBytesInStorage
}

// GetRawBytesInStorage implements the catalog.Descriptor interface.
func (desc *Mutable) GetRawBytesInStorage() []byte {
return desc.rawBytesInStorage
}

// maybeRemoveDroppedSelfEntryFromSchemas removes an entry in the Schemas map corresponding to the
// database itself which was added due to a bug in prior versions when dropping any user-defined schema.
// The bug inserted an entry for the database rather than the schema being dropped. This function fixes the
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/catalog/dbdesc/database_desc_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type databaseDescriptorBuilder struct {
mvccTimestamp hlc.Timestamp
isUncommittedVersion bool
changes catalog.PostDeserializationChanges
// This is the raw bytes (tag + data) of the database descriptor in storage.
rawBytesInStorage []byte
}

var _ DatabaseDescriptorBuilder = &databaseDescriptorBuilder{}
Expand Down Expand Up @@ -144,6 +146,11 @@ func (ddb *databaseDescriptorBuilder) RunRestoreChanges(
return nil
}

// SetRawBytesInStorage implements the catalog.DescriptorBuilder interface.
func (ddb *databaseDescriptorBuilder) SetRawBytesInStorage(rawBytes []byte) {
ddb.rawBytesInStorage = append([]byte(nil), rawBytes...) // deep-copy
}

func maybeConvertIncompatibleDBPrivilegesToDefaultPrivileges(
privileges *catpb.PrivilegeDescriptor, defaultPrivileges *catpb.DefaultPrivilegeDescriptor,
) (hasChanged bool) {
Expand Down Expand Up @@ -199,6 +206,7 @@ func (ddb *databaseDescriptorBuilder) BuildImmutableDatabase() catalog.DatabaseD
DatabaseDescriptor: *desc,
isUncommittedVersion: ddb.isUncommittedVersion,
changes: ddb.changes,
rawBytesInStorage: append([]byte(nil), ddb.rawBytesInStorage...), // deep-copy
}
}

Expand All @@ -218,6 +226,7 @@ func (ddb *databaseDescriptorBuilder) BuildExistingMutableDatabase() *Mutable {
DatabaseDescriptor: *ddb.maybeModified,
changes: ddb.changes,
isUncommittedVersion: ddb.isUncommittedVersion,
rawBytesInStorage: append([]byte(nil), ddb.rawBytesInStorage...), // deep-copy
},
ClusterVersion: &immutable{DatabaseDescriptor: *ddb.original},
}
Expand All @@ -240,6 +249,7 @@ func (ddb *databaseDescriptorBuilder) BuildCreatedMutableDatabase() *Mutable {
DatabaseDescriptor: *desc,
changes: ddb.changes,
isUncommittedVersion: ddb.isUncommittedVersion,
rawBytesInStorage: append([]byte(nil), ddb.rawBytesInStorage...), // deep-copy
},
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type DescriptorBuilder interface {
// upgrade migrations
RunRestoreChanges(descLookupFn func(id descpb.ID) Descriptor) error

// SetRawBytesInStorage sets `rawBytesInStorage` field by deep-copying `rawBytes`.
SetRawBytesInStorage(rawBytes []byte)

// BuildImmutable returns an immutable Descriptor.
BuildImmutable() Descriptor

Expand Down Expand Up @@ -225,6 +228,10 @@ type Descriptor interface {

// SkipNamespace is true when a descriptor should not have a namespace record.
SkipNamespace() bool

// GetRawBytesInStorage returns the raw bytes (tag + data) of the descriptor in storage.
// It is exclusively used in the CPut when persisting an updated descriptor to storage.
GetRawBytesInStorage() []byte
}

// HydratableDescriptor represent a Descriptor which needs user-define type
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ func (tc *Collection) WriteDescToBatch(
return err
}
}
// Retrieve the expected bytes of `desc` in storage.
// If this is the first time we write to `desc` in the transaction, its
// expected bytes will be retrieved when we read it into this desc.Collection,
// and carried over in it.
// If, however, this is not the first time we write to `desc` in the transaction,
// which means it has existed in `tc.uncommitted`, we will retrieve the expected
// bytes from there.
var expected []byte
if exist := tc.uncommitted.getUncommittedByID(desc.GetID()); exist != nil {
expected = exist.GetRawBytesInStorage()
} else {
expected = desc.GetRawBytesInStorage()
}

if err := tc.AddUncommittedDescriptor(ctx, desc); err != nil {
return err
}
Expand All @@ -277,7 +291,7 @@ func (tc *Collection) WriteDescToBatch(
if kvTrace {
log.VEventf(ctx, 2, "Put %s -> %s", descKey, proto)
}
b.Put(descKey, proto)
b.CPut(descKey, proto, expected)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestAddUncommittedDescriptorAndMutableResolution(t *testing.T) {
immByNameAfter, err := descriptors.GetImmutableDatabaseByName(ctx, txn, "new_name", flags)
require.NoError(t, err)
require.Equal(t, db.GetVersion(), immByNameAfter.GetVersion())
require.Equal(t, mut.ImmutableCopy(), immByNameAfter)
require.Equal(t, mut.ImmutableCopy().DescriptorProto(), immByNameAfter.DescriptorProto())

_, immByIDAfter, err := descriptors.GetImmutableDatabaseByID(ctx, txn, db.GetID(), flags)
require.NoError(t, err)
Expand Down Expand Up @@ -733,7 +733,7 @@ func TestDescriptorCache(t *testing.T) {
}
found := cat.LookupDescriptorEntry(mut.ID)
require.NotEmpty(t, found)
require.Equal(t, mut.ImmutableCopy(), found)
require.Equal(t, mut.ImmutableCopy().DescriptorProto(), found.DescriptorProto())
return nil
}))
})
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestDescriptorCache(t *testing.T) {
return err
}
require.Len(t, dbDescs, 4)
require.Equal(t, mut.ImmutableCopy(), dbDescs[0])
require.Equal(t, mut.ImmutableCopy().DescriptorProto(), dbDescs[0].DescriptorProto())
return nil
}))
})
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/catalog/descs/uncommitted_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
Expand Down Expand Up @@ -186,7 +187,20 @@ func (ud *uncommittedDescriptors) upsert(
newBytes += mut.ByteSize()
}
ud.mutable.Upsert(mut)
uncommitted := mut.ImmutableCopy()
// Upserting a descriptor into the "uncommitted" set implies
// this descriptor is going to be written to storage very soon. We
// compute what the raw bytes of this descriptor in storage is going to
// look like when that write happens, so that any further update to this
// descriptor, which will be retrieved from the "uncommitted" set, will
// carry the correct raw bytes in storage with it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this, it's rather elegant.

var val roachpb.Value
if err = val.SetProto(mut.DescriptorProto()); err != nil {
return err
}
rawBytesInStorageAfterPendingWrite := val.TagAndDataBytes()
uncommittedBuilder := mut.NewBuilder()
uncommittedBuilder.SetRawBytesInStorage(rawBytesInStorageAfterPendingWrite)
uncommitted := uncommittedBuilder.BuildImmutable()
newBytes += uncommitted.ByteSize()
ud.uncommitted.Upsert(uncommitted, uncommitted.SkipNamespace())
if err = ud.memAcc.Grow(ctx, newBytes); err != nil {
Expand Down
21 changes: 19 additions & 2 deletions pkg/sql/catalog/funcdesc/func_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type immutable struct {
isUncommittedVersion bool

changes catalog.PostDeserializationChanges

// This is the raw bytes (tag + data) of the function descriptor in storage.
rawBytesInStorage []byte
}

// Mutable represents a mutable function descriptor.
Expand Down Expand Up @@ -147,12 +150,16 @@ func (desc *immutable) GetDeclarativeSchemaChangerState() *scpb.DescriptorState

// NewBuilder implements the catalog.Descriptor interface.
func (desc *Mutable) NewBuilder() catalog.DescriptorBuilder {
return newBuilder(&desc.FunctionDescriptor, hlc.Timestamp{}, desc.IsUncommittedVersion(), desc.changes)
b := newBuilder(&desc.FunctionDescriptor, hlc.Timestamp{}, desc.IsUncommittedVersion(), desc.changes)
b.SetRawBytesInStorage(desc.GetRawBytesInStorage())
return b
}

// NewBuilder implements the catalog.Descriptor interface.
func (desc *immutable) NewBuilder() catalog.DescriptorBuilder {
return newBuilder(&desc.FunctionDescriptor, hlc.Timestamp{}, desc.IsUncommittedVersion(), desc.changes)
b := newBuilder(&desc.FunctionDescriptor, hlc.Timestamp{}, desc.IsUncommittedVersion(), desc.changes)
b.SetRawBytesInStorage(desc.GetRawBytesInStorage())
return b
}

// GetReferencedDescIDs implements the catalog.Descriptor interface.
Expand Down Expand Up @@ -363,6 +370,16 @@ func (desc *immutable) SkipNamespace() bool {
return true
}

// GetRawBytesInStorage implements the catalog.Descriptor interface.
func (desc *immutable) GetRawBytesInStorage() []byte {
return desc.rawBytesInStorage
}

// GetRawBytesInStorage implements the catalog.Descriptor interface.
func (desc *Mutable) GetRawBytesInStorage() []byte {
return desc.rawBytesInStorage
}

// IsUncommittedVersion implements the catalog.LeasableDescriptor interface.
func (desc *Mutable) IsUncommittedVersion() bool {
return desc.IsNew() || desc.clusterVersion.GetVersion() != desc.GetVersion()
Expand Down
28 changes: 19 additions & 9 deletions pkg/sql/catalog/funcdesc/func_desc_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,18 @@ type functionDescriptorBuilder struct {
mvccTimestamp hlc.Timestamp
isUncommittedVersion bool
changes catalog.PostDeserializationChanges
// This is the raw bytes (tag + data) of the function descriptor in storage.
rawBytesInStorage []byte
}

// DescriptorType implements the catalog.DescriptorBuilder interface.
func (fdb functionDescriptorBuilder) DescriptorType() catalog.DescriptorType {
func (fdb *functionDescriptorBuilder) DescriptorType() catalog.DescriptorType {
return catalog.Function
}

// RunPostDeserializationChanges implements the catalog.DescriptorBuilder
// interface.
func (fdb functionDescriptorBuilder) RunPostDeserializationChanges() (err error) {
func (fdb *functionDescriptorBuilder) RunPostDeserializationChanges() (err error) {
defer func() {
err = errors.Wrapf(err, "function %q (%d)", fdb.original.Name, fdb.original.ID)
}()
Expand All @@ -101,29 +103,34 @@ func (fdb functionDescriptorBuilder) RunPostDeserializationChanges() (err error)
}

// RunRestoreChanges implements the catalog.DescriptorBuilder interface.
func (fdb functionDescriptorBuilder) RunRestoreChanges(
func (fdb *functionDescriptorBuilder) RunRestoreChanges(
descLookupFn func(id descpb.ID) catalog.Descriptor,
) error {
return nil
}

// SetRawBytesInStorage implements the catalog.DescriptorBuilder interface.
func (fdb *functionDescriptorBuilder) SetRawBytesInStorage(rawBytes []byte) {
fdb.rawBytesInStorage = append([]byte(nil), rawBytes...) // deep-copy
}

// BuildImmutable implements the catalog.DescriptorBuilder interface.
func (fdb functionDescriptorBuilder) BuildImmutable() catalog.Descriptor {
func (fdb *functionDescriptorBuilder) BuildImmutable() catalog.Descriptor {
return fdb.BuildImmutableFunction()
}

// BuildExistingMutable implements the catalog.DescriptorBuilder interface.
func (fdb functionDescriptorBuilder) BuildExistingMutable() catalog.MutableDescriptor {
func (fdb *functionDescriptorBuilder) BuildExistingMutable() catalog.MutableDescriptor {
return fdb.BuildExistingMutableFunction()
}

// BuildCreatedMutable implements the catalog.DescriptorBuilder interface.
func (fdb functionDescriptorBuilder) BuildCreatedMutable() catalog.MutableDescriptor {
func (fdb *functionDescriptorBuilder) BuildCreatedMutable() catalog.MutableDescriptor {
return fdb.BuildCreatedMutableFunction()
}

// BuildImmutableFunction implements the FunctionDescriptorBuilder interface.
func (fdb functionDescriptorBuilder) BuildImmutableFunction() catalog.FunctionDescriptor {
func (fdb *functionDescriptorBuilder) BuildImmutableFunction() catalog.FunctionDescriptor {
desc := fdb.maybeModified
if desc == nil {
desc = fdb.original
Expand All @@ -132,11 +139,12 @@ func (fdb functionDescriptorBuilder) BuildImmutableFunction() catalog.FunctionDe
FunctionDescriptor: *desc,
isUncommittedVersion: fdb.isUncommittedVersion,
changes: fdb.changes,
rawBytesInStorage: append([]byte(nil), fdb.rawBytesInStorage...), // deep-copy
}
}

// BuildExistingMutableFunction implements the FunctionDescriptorBuilder interface.
func (fdb functionDescriptorBuilder) BuildExistingMutableFunction() *Mutable {
func (fdb *functionDescriptorBuilder) BuildExistingMutableFunction() *Mutable {
if fdb.maybeModified == nil {
fdb.maybeModified = protoutil.Clone(fdb.original).(*descpb.FunctionDescriptor)
}
Expand All @@ -145,13 +153,14 @@ func (fdb functionDescriptorBuilder) BuildExistingMutableFunction() *Mutable {
FunctionDescriptor: *fdb.maybeModified,
isUncommittedVersion: fdb.isUncommittedVersion,
changes: fdb.changes,
rawBytesInStorage: append([]byte(nil), fdb.rawBytesInStorage...), // deep-copy
},
clusterVersion: &immutable{FunctionDescriptor: *fdb.original},
}
}

// BuildCreatedMutableFunction implements the FunctionDescriptorBuilder interface.
func (fdb functionDescriptorBuilder) BuildCreatedMutableFunction() *Mutable {
func (fdb *functionDescriptorBuilder) BuildCreatedMutableFunction() *Mutable {
desc := fdb.maybeModified
if desc == nil {
desc = fdb.original
Expand All @@ -161,6 +170,7 @@ func (fdb functionDescriptorBuilder) BuildCreatedMutableFunction() *Mutable {
FunctionDescriptor: *desc,
isUncommittedVersion: fdb.isUncommittedVersion,
changes: fdb.changes,
rawBytesInStorage: append([]byte(nil), fdb.rawBytesInStorage...), // deep-copy
},
}
}
1 change: 1 addition & 0 deletions pkg/sql/catalog/internal/catkv/catalog_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func build(
if err := b.RunPostDeserializationChanges(); err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err, "error during RunPostDeserializationChanges")
}
b.SetRawBytesInStorage(rowValue.TagAndDataBytes())
desc := b.BuildImmutable()
if id != desc.GetID() {
return nil, errors.AssertionFailedf("unexpected ID %d in descriptor", desc.GetID())
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/schemadesc/public_schema_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (p public) GetName() string { return tree.PublicSchema }
func (p public) GetPrivileges() *catpb.PrivilegeDescriptor {
return catpb.NewPublicSchemaPrivilegeDescriptor()
}
func (p public) GetRawBytesInStorage() []byte { return nil }

// GetPrivilegeDescriptor implements the PrivilegeObject interface.
func (p public) GetPrivilegeDescriptor(
Expand Down
Loading