From 149137bf736148fddfd2b850162ea5810dc09331 Mon Sep 17 00:00:00 2001 From: Jason Chan Date: Thu, 9 Jun 2022 12:27:21 -0700 Subject: [PATCH 1/3] sql: disallow adding column as primary key Previously, the behavior of ALTER TABLE ... ADD COLUMN ... PRIMARY KEY was undefined. With the legacy schema changer, this statement would appear to succeed, but it would break the new schema changer. This commit changes this by returning an explicit error. In the future, we should support this statement if the table's primary key was originally the default rowid primary key (see #82735). Fixes #81449 Release note: None --- pkg/sql/alter_table.go | 4 ++++ pkg/sql/logictest/testdata/logic_test/alter_table | 11 +++++++++++ .../internal/scbuildstmt/alter_table_add_column.go | 13 +++++++++++++ 3 files changed, 28 insertions(+) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index adf4f4afd94a..4d3c7e36a875 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -188,6 +188,10 @@ func (n *alterTableNode) startExec(params runParams) error { "UNIQUE WITHOUT INDEX constraint on the column", ) } + if t.ColumnDef.PrimaryKey.IsPrimaryKey { + return pgerror.Newf(pgcode.InvalidColumnDefinition, + "multiple primary keys for table %q are not allowed", tn.Object()) + } var err error params.p.runWithOptions(resolveFlags{contextDatabaseID: n.tableDesc.ParentID}, func() { err = params.p.addColumnImpl(params, n, tn, n.tableDesc, t) diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index 32fef1b3ed93..281d5e69ace6 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -2593,3 +2593,14 @@ CREATE INDEX idx_j ON t_add_column_not_null (j); statement ok DROP TABLE t_add_column_not_null + +subtest regression_81448 + +statement ok +CREATE TABLE t81448 (a INT PRIMARY KEY) + +statement error pq: multiple primary keys for table "t81448" are not allowed +ALTER TABLE t81448 ADD COLUMN b INT PRIMARY KEY + +statement ok +DROP TABLE t81448 diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go index 94be4de8dbec..3ecff5285e4f 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_add_column.go @@ -78,6 +78,19 @@ func alterTableAddColumn( "UNIQUE WITHOUT INDEX constraint on the column", )) } + if d.PrimaryKey.IsPrimaryKey { + publicTargets := b.QueryByID(tbl.TableID).Filter( + func(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element) bool { + return target == scpb.ToPublic + }, + ) + _, _, primaryIdx := scpb.FindPrimaryIndex(publicTargets) + // TODO(#82735): support when primary key is implicit + if primaryIdx != nil { + panic(pgerror.Newf(pgcode.InvalidColumnDefinition, + "multiple primary keys for table %q are not allowed", tn.Object())) + } + } if d.IsComputed() { d.Computed.Expr = schemaexpr.MaybeRewriteComputedColumn(d.Computed.Expr, b.SessionData()) } From 9064a0c53606ffd5e0639bddedde78ea58b8c5e4 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 10 Jun 2022 18:57:40 -0400 Subject: [PATCH 2/3] changefeedccl: Handle node unavailable error. Treat NodeUnavailable error as a retryable changefeed error. NodeUnavaiable error may be returned by changefeed processors if the node is being shutdown/drained. However, this error return is racy. Sometimes, the coordinator would see "rpc context cancellation" error instead -- in those cases it would treat the error as retryable. However, sometimes it is possible to get this error propagated (for example: when server shutdown races with starting up kv feed, which uses Stopper, whcih may return NodeUnavailable error if it's being shutdown). Release Notes (bug fix): Treat node unavailable error as a retryable changefeed error. --- pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeedbase/errors.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel index 88b7f4fcae03..c007b56469b8 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel +++ b/pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/clusterversion", "//pkg/jobs/joberror", "//pkg/jobs/jobspb", + "//pkg/roachpb", "//pkg/settings", "//pkg/sql", "//pkg/sql/catalog", diff --git a/pkg/ccl/changefeedccl/changefeedbase/errors.go b/pkg/ccl/changefeedccl/changefeedbase/errors.go index ec73d85110d2..66ec4adee015 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/errors.go +++ b/pkg/ccl/changefeedccl/changefeedbase/errors.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/errors" ) @@ -117,7 +118,9 @@ func IsRetryableError(err error) bool { return true } - return (joberror.IsDistSQLRetryableError(err) || flowinfra.IsNoInboundStreamConnectionError(err)) + return (joberror.IsDistSQLRetryableError(err) || + flowinfra.IsNoInboundStreamConnectionError(err) || + errors.HasType(err, (*roachpb.NodeUnavailableError)(nil))) } // MaybeStripRetryableErrorMarker performs some minimal attempt to clean the From 856576282672dd6f110026f14617c1cf0fe4f3eb Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 10 Jun 2022 19:00:34 -0400 Subject: [PATCH 3/3] changefeedccl: Do not inhibit server shutdown. Informs #82765 Permanet fix is being tracked by the above issue. This is a temporary fix to ensure that change aggregators cancel their context when the server quiesces so that the server shutdown is not inhibited by the running changefeeds. The test for this functionality is not being merged due to the fact that it takes too long to run; however, the test can be seen here: https://github.com/cockroachdb/cockroach/pull/82767 Release Notes (bug fix): Ensure running changefeeds do not inhibit node shutdown. --- pkg/ccl/changefeedccl/changefeed_processors.go | 9 +++------ pkg/ccl/changefeedccl/changefeed_stmt.go | 7 ++++++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index ab8879cade1c..f729b60d857a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -208,17 +208,14 @@ func (ca *changeAggregator) MustBeStreaming() bool { // Start is part of the RowSource interface. func (ca *changeAggregator) Start(ctx context.Context) { + // Derive a separate context so that we can shutdown the poller. + ctx, ca.cancel = ca.flowCtx.Stopper().WithCancelOnQuiesce(ctx) + if ca.spec.JobID != 0 { ctx = logtags.AddTag(ctx, "job", ca.spec.JobID) } ctx = ca.StartInternal(ctx, changeAggregatorProcName) - // Derive a separate context so that we can shutdown the poller. Note that - // we need to update both ctx (used throughout this function) and - // ProcessorBase.Ctx (used in all other methods) to the new context. - ctx, ca.cancel = context.WithCancel(ctx) - ca.Ctx = ctx - spans, err := ca.setupSpansAndFrontier() endTime := ca.spec.Feed.EndTime diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index d14042ce636b..3bead972b153 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1030,7 +1030,12 @@ func (b *changefeedResumer) resumeWithRetries( } } - if !changefeedbase.IsRetryableError(err) { + // Retry changefeed is error is retryable. In addition, we want to handle + // context cancellation as retryable, but only if the resumer context has not been cancelled. + // (resumer context is canceled by the jobs framework -- so we should respect it). + isRetryableErr := changefeedbase.IsRetryableError(err) || + (ctx.Err() == nil && errors.Is(err, context.Canceled)) + if !isRetryableErr { if ctx.Err() != nil { return ctx.Err() }