Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: assign sequence ownership created when create/alter table #74840

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ func mysqlTableToCockroach(
privilegeDesc := descpb.NewBasePrivilegeDescriptor(owner)
seqDesc, err = sql.NewSequenceTableDesc(
ctx,
nil, /* planner */
seqName,
opts,
parentDB.GetID(),
Expand All @@ -438,7 +439,6 @@ func mysqlTableToCockroach(
time,
privilegeDesc,
tree.PersistencePermanent,
nil, /* params */
// If this is multi-region, this will get added by WriteDescriptors.
false, /* isMultiRegion */
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func createPostgresSequences(
}
desc, err := sql.NewSequenceTableDesc(
ctx,
nil, /* planner */
schemaAndTableName.table,
seq.Options,
parentID,
Expand All @@ -329,7 +330,6 @@ func createPostgresSequences(
hlc.Timestamp{WallTime: walltime},
descpb.NewBasePrivilegeDescriptor(owner),
tree.PersistencePermanent,
nil, /* params */
// If this is multi-region, this will get added by WriteDescriptors.
false, /* isMultiRegion */
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func descForTable(
priv := descpb.NewBasePrivilegeDescriptor(security.AdminRoleName())
desc, err := sql.NewSequenceTableDesc(
ctx,
nil, /* planner */
name,
tree.SequenceOptions{},
parent,
Expand All @@ -68,7 +69,6 @@ func descForTable(
ts,
priv,
tree.PersistencePermanent,
nil, /* params */
false, /* isMultiRegion */
)
if err != nil {
Expand Down
74 changes: 43 additions & 31 deletions pkg/sql/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,24 @@ func (p *planner) addColumnImpl(
return err
}

var colOwnedSeqDesc *tabledesc.Mutable
newDef, seqPrefix, seqName, seqOpts, err := params.p.processSerialLikeInColumnDef(params.ctx, d, tn)
if err != nil {
return err
}
if seqName != nil {
if err := doCreateSequence(
params,
colOwnedSeqDesc, err = doCreateSequence(
params.ctx,
params.p,
params.SessionData(),
seqPrefix.Database,
seqPrefix.Schema,
seqName,
n.tableDesc.Persistence(),
seqOpts,
tree.AsStringWithFQNames(n.n, params.Ann()),
); err != nil {
)
if err != nil {
return err
}
}
Expand Down Expand Up @@ -103,27 +107,6 @@ func (p *planner) addColumnImpl(
}
}

// If the new column has a DEFAULT or an ON UPDATE expression that uses a
// sequence, add references between its descriptor and this column descriptor.
if err := cdd.ForEachTypedExpr(func(expr tree.TypedExpr) error {
changedSeqDescs, err := maybeAddSequenceDependencies(
params.ctx, params.ExecCfg().Settings, params.p, n.tableDesc, col, expr, nil,
)
if err != nil {
return err
}
for _, changedSeqDesc := range changedSeqDescs {
if err := params.p.writeSchemaChange(
params.ctx, changedSeqDesc, descpb.InvalidMutationID, tree.AsStringWithFQNames(n.n, params.Ann()),
); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}

// We're checking to see if a user is trying add a non-nullable column without a default to a
// non empty table by scanning the primary index span with a limit of 1 to see if any key exists.
if !col.Nullable && (col.DefaultExpr == nil && !col.IsComputed()) {
Expand Down Expand Up @@ -176,16 +159,45 @@ func (p *planner) addColumnImpl(
n.tableDesc.SetPrimaryIndex(primaryIndex)
}

// Zone configuration logic is only required for REGIONAL BY ROW tables
// with newly created indexes.
if n.tableDesc.IsLocalityRegionalByRow() && idx != nil {
// We need to allocate new IDs for the created columns and indexes
// in case we need to configure their zone partitioning.
// This must be done after every object is created.
if err := n.tableDesc.AllocateIDs(params.ctx); err != nil {
// We need to allocate new ID for the created column in order to correctly
// assign sequence ownership.
if err := n.tableDesc.AllocateIDs(params.ctx); err != nil {
return err
}

// If the new column has a DEFAULT or an ON UPDATE expression that uses a
// sequence, add references between its descriptor and this column descriptor.
if err := cdd.ForEachTypedExpr(func(expr tree.TypedExpr) error {
changedSeqDescs, err := maybeAddSequenceDependencies(
params.ctx, params.ExecCfg().Settings, params.p, n.tableDesc, col, expr, nil,
)
if err != nil {
return err
}
for _, changedSeqDesc := range changedSeqDescs {
// `colOwnedSeqDesc` and `changedSeqDesc` should refer to a same instance.
// But we still want to use the right copy to write a schema change for by
// using `changedSeqDesc` just in case the assumption became false in the
// future.
if colOwnedSeqDesc != nil && colOwnedSeqDesc.ID == changedSeqDesc.ID {
if err := setSequenceOwner(changedSeqDesc, d.Name, desc); err != nil {
return err
}
}
if err := params.p.writeSchemaChange(
params.ctx, changedSeqDesc, descpb.InvalidMutationID, tree.AsStringWithFQNames(n.n, params.Ann()),
); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}

// Zone configuration logic is only required for REGIONAL BY ROW tables
// with newly created indexes.
if n.tableDesc.IsLocalityRegionalByRow() && idx != nil {
// Configure zone configuration if required. This must happen after
// all the IDs have been allocated.
if err := p.configureZoneConfigForNewIndexPartitioning(
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/alter_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func (n *alterSequenceNode) startExec(params runParams) error {
}
}
if err := assignSequenceOptions(
params.ctx,
params.p,
desc.SequenceOpts,
n.n.Options,
false, /* setDefaults */
&params,
desc.GetID(),
desc.ParentID,
existingType,
Expand Down
107 changes: 86 additions & 21 deletions pkg/sql/create_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sql

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -80,33 +82,37 @@ func (n *createSequenceNode) startExec(params runParams) error {
return err
}

return doCreateSequence(
params, n.dbDesc, schemaDesc, &n.n.Name, n.n.Persistence, n.n.Options,
_, err = doCreateSequence(
params.ctx, params.p, params.SessionData(), n.dbDesc, schemaDesc, &n.n.Name, n.n.Persistence, n.n.Options,
tree.AsStringWithFQNames(n.n, params.Ann()),
)

return err
}

// doCreateSequence performs the creation of a sequence in KV. The
// context argument is a string to use in the event log.
func doCreateSequence(
params runParams,
ctx context.Context,
p *planner,
sessionData *sessiondata.SessionData,
dbDesc catalog.DatabaseDescriptor,
scDesc catalog.SchemaDescriptor,
name *tree.TableName,
persistence tree.Persistence,
opts tree.SequenceOptions,
jobDesc string,
) error {
id, err := descidgen.GenerateUniqueDescID(params.ctx, params.p.ExecCfg().DB, params.p.ExecCfg().Codec)
) (*tabledesc.Mutable, error) {
id, err := descidgen.GenerateUniqueDescID(ctx, p.ExecCfg().DB, p.ExecCfg().Codec)
if err != nil {
return err
return nil, err
}

privs := catprivilege.CreatePrivilegesFromDefaultPrivileges(
dbDesc.GetDefaultPrivilegeDescriptor(),
scDesc.GetDefaultPrivilegeDescriptor(),
dbDesc.GetID(),
params.SessionData().User(),
sessionData.User(),
tree.Sequences,
dbDesc.GetPrivileges(),
)
Expand All @@ -122,7 +128,8 @@ func doCreateSequence(
// currently relied on in import and restore code and tests.
var creationTime hlc.Timestamp
desc, err := NewSequenceTableDesc(
params.ctx,
ctx,
p,
name.Object(),
opts,
dbDesc.GetID(),
Expand All @@ -131,49 +138,107 @@ func doCreateSequence(
creationTime,
privs,
persistence,
&params,
dbDesc.IsMultiRegion(),
)
if err != nil {
return err
return nil, err
}

// makeSequenceTableDesc already validates the table. No call to
// desc.ValidateSelf() needed here.

key := catalogkeys.MakeObjectNameKey(params.ExecCfg().Codec, dbDesc.GetID(), scDesc.GetID(), name.Object())
if err = params.p.createDescriptorWithID(params.ctx, key, id, desc, jobDesc); err != nil {
return err
key := catalogkeys.MakeObjectNameKey(p.ExecCfg().Codec, dbDesc.GetID(), scDesc.GetID(), name.Object())
if err = p.createDescriptorWithID(ctx, key, id, desc, jobDesc); err != nil {
return nil, err
}

// Initialize the sequence value.
seqValueKey := params.ExecCfg().Codec.SequenceKey(uint32(id))
seqValueKey := p.ExecCfg().Codec.SequenceKey(uint32(id))
b := &kv.Batch{}
b.Inc(seqValueKey, desc.SequenceOpts.Start-desc.SequenceOpts.Increment)
if err := params.p.txn.Run(params.ctx, b); err != nil {
return err
if err := p.txn.Run(ctx, b); err != nil {
return nil, err
}

if err := validateDescriptor(params.ctx, params.p, desc); err != nil {
return err
if err := validateDescriptor(ctx, p, desc); err != nil {
return nil, err
}

// Log Create Sequence event. This is an auditable log event and is
// recorded in the same transaction as the table descriptor update.
return params.p.logEvent(params.ctx,
return desc, p.logEvent(ctx,
desc.ID,
&eventpb.CreateSequence{
SequenceName: name.FQString(),
})
}

func createSequencesForSerialColumns(
ctx context.Context,
p *planner,
sessionData *sessiondata.SessionData,
db catalog.DatabaseDescriptor,
sc catalog.SchemaDescriptor,
n *tree.CreateTable,
) (map[tree.Name]*tabledesc.Mutable, error) {
colNameToSeqDesc := make(map[tree.Name]*tabledesc.Mutable)
createStmt := n
ensureCopy := func() {
if createStmt == n {
newCreateStmt := *n
n.Defs = append(tree.TableDefs(nil), n.Defs...)
createStmt = &newCreateStmt
}
}

tn := tree.MakeTableNameFromPrefix(catalog.ResolvedObjectPrefix{
Database: db,
Schema: sc,
}.NamePrefix(), tree.Name(n.Table.Table()))
for i, def := range n.Defs {
d, ok := def.(*tree.ColumnTableDef)
if !ok {
continue
}
newDef, prefix, seqName, seqOpts, err := p.processSerialLikeInColumnDef(ctx, d, &tn)
if err != nil {
return nil, err
}
// TODO (lucy): Have more consistent/informative names for dependent jobs.
if seqName != nil {
seqDesc, err := doCreateSequence(
ctx,
p,
sessionData,
prefix.Database,
prefix.Schema,
seqName,
n.Persistence,
seqOpts,
fmt.Sprintf("creating sequence %s for new table %s", seqName, n.Table.Table()),
)
if err != nil {
return nil, err
}
colNameToSeqDesc[d.Name] = seqDesc
}
if d != newDef {
ensureCopy()
n.Defs[i] = newDef
}
}

return colNameToSeqDesc, nil
}

func (*createSequenceNode) Next(runParams) (bool, error) { return false, nil }
func (*createSequenceNode) Values() tree.Datums { return tree.Datums{} }
func (*createSequenceNode) Close(context.Context) {}

// NewSequenceTableDesc creates a sequence descriptor.
func NewSequenceTableDesc(
ctx context.Context,
p *planner,
sequenceName string,
sequenceOptions tree.SequenceOptions,
parentID descpb.ID,
Expand All @@ -182,7 +247,6 @@ func NewSequenceTableDesc(
creationTime hlc.Timestamp,
privileges *descpb.PrivilegeDescriptor,
persistence tree.Persistence,
params *runParams,
isMultiRegion bool,
) (*tabledesc.Mutable, error) {
desc := tabledesc.InitTableDescriptor(
Expand Down Expand Up @@ -227,10 +291,11 @@ func NewSequenceTableDesc(
Increment: 1,
}
if err := assignSequenceOptions(
ctx,
p,
opts,
sequenceOptions,
true, /* setDefaults */
params,
id,
parentID,
nil, /* existingType */
Expand Down
Loading