Skip to content

Commit

Permalink
Merge #82489 #82768
Browse files Browse the repository at this point in the history
82489: sql: disallow adding column as primary key r=jasonmchan a=jasonmchan

Previously, the behavior of ALTER TABLE ... ADD COLUMN ... PRIMARY KEY
was undefined. With the legacy schema changer, this statement would
appear to succeed, but it would break the new schema changer. This
commit changes this by returning an explicit error.

In the future, we should support this statement if the table's primary
key was originally the default rowid primary key (see #82735).

Fixes #82489

Release note: None

82768: changefeedccl: Do not inhibit node shutdown r=miretskiy a=miretskiy

See individual commits for details:
  * Ensure running changefeed do not inhibit node shutdown (Informs #82765)
  * Treat node unavailable error as retryable.

Test not being merged as part of this PR -- see #82767

Release Notes (bug fix): Ensure running changefeeds do
not inhibit node shutdown.
Release Notes (bug fix): Treat node unavailable error as a retryable changefeed error.

Co-authored-by: Jason Chan <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
3 people committed Jun 14, 2022
3 parents 193442a + 149137b + 8565762 commit 4f77f0e
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 8 deletions.
9 changes: 3 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,14 @@ func (ca *changeAggregator) MustBeStreaming() bool {

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) {
// Derive a separate context so that we can shutdown the poller.
ctx, ca.cancel = ca.flowCtx.Stopper().WithCancelOnQuiesce(ctx)

if ca.spec.JobID != 0 {
ctx = logtags.AddTag(ctx, "job", ca.spec.JobID)
}
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

// Derive a separate context so that we can shutdown the poller. Note that
// we need to update both ctx (used throughout this function) and
// ProcessorBase.Ctx (used in all other methods) to the new context.
ctx, ca.cancel = context.WithCancel(ctx)
ca.Ctx = ctx

spans, err := ca.setupSpansAndFrontier()

endTime := ca.spec.Feed.EndTime
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,12 @@ func (b *changefeedResumer) resumeWithRetries(
}
}

if !changefeedbase.IsRetryableError(err) {
// Retry changefeed is error is retryable. In addition, we want to handle
// context cancellation as retryable, but only if the resumer context has not been cancelled.
// (resumer context is canceled by the jobs framework -- so we should respect it).
isRetryableErr := changefeedbase.IsRetryableError(err) ||
(ctx.Err() == nil && errors.Is(err, context.Canceled))
if !isRetryableErr {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/clusterversion",
"//pkg/jobs/joberror",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -117,7 +118,9 @@ func IsRetryableError(err error) bool {
return true
}

return (joberror.IsDistSQLRetryableError(err) || flowinfra.IsNoInboundStreamConnectionError(err))
return (joberror.IsDistSQLRetryableError(err) ||
flowinfra.IsNoInboundStreamConnectionError(err) ||
errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)))
}

// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func (n *alterTableNode) startExec(params runParams) error {
"UNIQUE WITHOUT INDEX constraint on the column",
)
}
if t.ColumnDef.PrimaryKey.IsPrimaryKey {
return pgerror.Newf(pgcode.InvalidColumnDefinition,
"multiple primary keys for table %q are not allowed", tn.Object())
}
var err error
params.p.runWithOptions(resolveFlags{contextDatabaseID: n.tableDesc.ParentID}, func() {
err = params.p.addColumnImpl(params, n, tn, n.tableDesc, t)
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -2593,3 +2593,14 @@ CREATE INDEX idx_j ON t_add_column_not_null (j);

statement ok
DROP TABLE t_add_column_not_null

subtest regression_81448

statement ok
CREATE TABLE t81448 (a INT PRIMARY KEY)

statement error pq: multiple primary keys for table "t81448" are not allowed
ALTER TABLE t81448 ADD COLUMN b INT PRIMARY KEY

statement ok
DROP TABLE t81448
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ func alterTableAddColumn(
"UNIQUE WITHOUT INDEX constraint on the column",
))
}
if d.PrimaryKey.IsPrimaryKey {
publicTargets := b.QueryByID(tbl.TableID).Filter(
func(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element) bool {
return target == scpb.ToPublic
},
)
_, _, primaryIdx := scpb.FindPrimaryIndex(publicTargets)
// TODO(#82735): support when primary key is implicit
if primaryIdx != nil {
panic(pgerror.Newf(pgcode.InvalidColumnDefinition,
"multiple primary keys for table %q are not allowed", tn.Object()))
}
}
if d.IsComputed() {
d.Computed.Expr = schemaexpr.MaybeRewriteComputedColumn(d.Computed.Expr, b.SessionData())
}
Expand Down

0 comments on commit 4f77f0e

Please sign in to comment.