Skip to content

Commit

Permalink
Merge #98650 #98654
Browse files Browse the repository at this point in the history
98650: sql/schemachanger: relieve restrictions on zone config r=fqazi a=fqazi

Previously, the declarative schema changer was blanket disabled for zone configurations. This was fairly
restrictive and meant that the declarative schema changer would not be used when any zone config existed. To address this, this patch only blocks declarative schema changes when a sub zone config exists.

Epic: none

Release note: None

98654: execinfra: correctly propagate processorID for LocalProcessors r=yuzefovich a=yuzefovich

Previously, this was incorrectly hard-coded as zero. The impact of this seems minor (I believe this would only make it so that we could incorrectly attribute `ComponentStats` object of `planNodeToRowSource` to the wrong processor), but I think it still deserves to be backported.

Epic: None

Release note: None

Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Mar 15, 2023
3 parents 125aa4a + 3c66440 + 9eb8ec3 commit adb0556
Show file tree
Hide file tree
Showing 23 changed files with 187 additions and 85 deletions.
15 changes: 15 additions & 0 deletions pkg/ccl/schemachangerccl/testdata/decomp/multiregion
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,21 @@ ElementState:
- TableZoneConfig:
tableId: 108
Status: PUBLIC
- IndexZoneConfig:
indexId: 1
partitionName: us-east1
tableId: 108
Status: PUBLIC
- IndexZoneConfig:
indexId: 1
partitionName: us-east2
tableId: 108
Status: PUBLIC
- IndexZoneConfig:
indexId: 1
partitionName: us-east3
tableId: 108
Status: PUBLIC
- TableData:
databaseId: 104
tableId: 108
Expand Down
32 changes: 16 additions & 16 deletions pkg/cli/testdata/declarative-rules/deprules

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func NewLimitedMonitorNoFlowCtx(
type LocalProcessor interface {
RowSourcedProcessor
// InitWithOutput initializes this processor.
InitWithOutput(ctx context.Context, flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error
InitWithOutput(ctx context.Context, flowCtx *FlowCtx, processorID int32, post *execinfrapb.PostProcessSpec, output RowReceiver) error
// SetInput initializes this LocalProcessor with an input RowSource. Not all
// LocalProcessors need inputs, but this needs to be called if a
// LocalProcessor expects to get its data from another RowSource.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (p *planNodeToRowSource) MustBeStreaming() bool {
func (p *planNodeToRowSource) InitWithOutput(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
processorID int32,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) error {
Expand All @@ -121,7 +122,7 @@ func (p *planNodeToRowSource) InitWithOutput(
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
0, /* processorID */
processorID,
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func NewProcessor(
return nil, err
}
processor := localProcessors[core.LocalPlanNode.RowSourceIdx]
if err := processor.InitWithOutput(ctx, flowCtx, post, outputs[0]); err != nil {
if err := processor.InitWithOutput(ctx, flowCtx, processorID, post, outputs[0]); err != nil {
return nil, err
}
if numInputs == 1 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,7 @@ func TestSchemaChangeFailureAfterCheckpointing(t *testing.T) {
defer server.Stopper().Stop(context.Background())

if _, err := sqlDB.Exec(`
SET use_declarative_schema_changer='off';
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
`); err != nil {
Expand Down Expand Up @@ -6200,6 +6201,7 @@ func TestRetriableErrorDuringRollback(t *testing.T) {
defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)()

_, err := sqlDB.Exec(`
SET use_declarative_schema_changer='off';
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT8);
INSERT INTO t.test VALUES (1, 2), (2, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func alterTableAddColumn(
d := t.ColumnDef
// We don't support handling zone config related properties for tables, so
// throw an unsupported error.
fallBackIfZoneConfigExists(b, d, tbl.TableID)
fallBackIfSubZoneConfigExists(b, t, tbl.TableID)
fallBackIfRegionalByRowTable(b, t, tbl.TableID)
fallBackIfVirtualColumnWithNotNullConstraint(t)
// Check column non-existence.
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func alterPrimaryKey(b BuildCtx, tn *tree.TableName, tbl *scpb.Table, t alterPri
fallBackIfConcurrentSchemaChange(b, t, tbl.TableID)
fallBackIfShardedIndexExists(b, t, tbl.TableID)
fallBackIfPartitionedIndexExists(b, t, tbl.TableID)
fallBackIfRegionalByRowTable(b, t, tbl.TableID)
fallBackIfRegionalByRowTable(b, t.n, tbl.TableID)
fallBackIfDescColInRowLevelTTLTables(b, tbl.TableID, t)
fallBackIfZoneConfigExists(b, t.n, tbl.TableID)
fallBackIfSubZoneConfigExists(b, t.n, tbl.TableID)
// Version gates functionally that is implemented after the statement is
// publicly published.
fallBackIfRequestedToBeShardedAndBeforeV231(b, t)
Expand Down Expand Up @@ -400,10 +400,10 @@ func fallBackIfShardedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID cat
// error if it's a REGIONAL BY ROW table because we need to
// include the implicit REGION column when constructing the
// new primary key.
func fallBackIfRegionalByRowTable(b BuildCtx, t alterPrimaryKeySpec, tableID catid.DescID) {
func fallBackIfRegionalByRowTable(b BuildCtx, t tree.NodeFormatter, tableID catid.DescID) {
_, _, rbrElem := scpb.FindTableLocalityRegionalByRow(b.QueryByID(tableID))
if rbrElem != nil {
panic(scerrors.NotImplementedErrorf(t.n, "ALTER PRIMARY KEY on a REGIONAL BY ROW table "+
panic(scerrors.NotImplementedErrorf(t, "ALTER PRIMARY KEY on a REGIONAL BY ROW table "+
"is not yet supported."))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
func alterTableDropColumn(
b BuildCtx, tn *tree.TableName, tbl *scpb.Table, n *tree.AlterTableDropColumn,
) {
fallBackIfZoneConfigExists(b, n, tbl.TableID)
fallBackIfSubZoneConfigExists(b, n, tbl.TableID)
fallBackIfRegionalByRowTable(b, n, tbl.TableID)
checkSafeUpdatesForDropColumn(b)
checkRegionalByRowColumnConflict(b, tbl, n)
col, elts, done := resolveColumnForDropColumn(b, tn, tbl, n)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func CreateIndex(b BuildCtx, n *tree.CreateIndex) {
IsExistenceOptional: false,
RequiredPrivilege: privilege.CREATE,
})
// We don't support handling zone config related properties for tables, so
// throw an unsupported error.
// We don't support handling zone config related properties for tables required
// for regional by row tables.
if _, _, tbl := scpb.FindTable(relationElements); tbl != nil {
fallBackIfZoneConfigExists(b, n, tbl.TableID)
fallBackIfRegionalByRowTable(b, n, tbl.TableID)
}
_, _, partitioning := scpb.FindTablePartitioning(relationElements)
if partitioning != nil && n.PartitionByIndex != nil &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func maybeDropIndex(
}
// We don't support handling zone config related properties for tables, so
// throw an unsupported error.
fallBackIfZoneConfigExists(b, nil, sie.TableID)
fallBackIfSubZoneConfigExists(b, nil, sie.TableID)
// Cannot drop the index if not CASCADE and a unique constraint depends on it.
if dropBehavior != tree.DropCascade && sie.IsUnique && !sie.IsCreatedExplicitly {
panic(errors.WithHint(
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,14 +776,16 @@ func makeSwapIndexSpec(
return in, temp
}

// fallBackIfZoneConfigExists determines if the table has regional by row
// properties and throws an unimplemented error.
func fallBackIfZoneConfigExists(b BuildCtx, n tree.NodeFormatter, id catid.DescID) {
// fallBackIfSubZoneConfigExists determines if the table has a subzone
// config. Normally this logic is used to limit index related operations,
// since dropping indexes will need to remove entries of sub zones from
// the zone config.
func fallBackIfSubZoneConfigExists(b BuildCtx, n tree.NodeFormatter, id catid.DescID) {
{
tableElts := b.QueryByID(id)
if _, _, elem := scpb.FindTableZoneConfig(tableElts); elem != nil {
if _, _, elem := scpb.FindIndexZoneConfig(tableElts); elem != nil {
panic(scerrors.NotImplementedErrorf(n,
"regional by row partitioning is not supported"))
"sub zone configs are not supported"))
}
}
}
Expand Down
16 changes: 0 additions & 16 deletions pkg/sql/schemachanger/scbuild/testdata/unimplemented_zone_cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ CREATE INDEX idx ON defaultdb.foo_index_zone_cfg(k, l);
ALTER INDEX defaultdb.foo_index_zone_cfg@idx CONFIGURE ZONE USING gc.ttlseconds=10;
----

unimplemented
ALTER TABLE defaultdb.foo ADD COLUMN j INT
----

unimplemented
ALTER TABLE defaultdb.foo ADD PRIMARY KEY (i, n)
----

unimplemented
ALTER TABLE defaultdb.foo DROP COLUMN k
----

unimplemented
DROP INDEX defaultdb.foo@idx
----

unimplemented
ALTER TABLE defaultdb.foo_index_zone_cfg ADD COLUMN j INT
----
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/schemachanger/scdecomp/decomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,14 @@ func (w *walkCtx) walkRelation(tbl catalog.TableDescriptor) {
&scpb.TableZoneConfig{
TableID: tbl.GetID(),
})
for _, subZoneCfg := range zoneCfg.ZoneConfigProto().Subzones {
w.ev(scpb.Status_PUBLIC,
&scpb.IndexZoneConfig{
TableID: tbl.GetID(),
IndexID: catid.IndexID(subZoneCfg.IndexID),
PartitionName: subZoneCfg.PartitionName,
})
}
}
}
if tbl.IsPhysicalTable() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/schemachanger/scpb/elements.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ message ElementProto {
TableComment table_comment = 28 [(gogoproto.moretags) = "parent:\"Table, View, Sequence\""];
RowLevelTTL row_level_ttl = 29 [(gogoproto.customname) = "RowLevelTTL", (gogoproto.moretags) = "parent:\"Table\""];
TableZoneConfig table_zone_config = 121 [(gogoproto.moretags) = "parent:\"Table, View\""];
IndexZoneConfig index_zone_config = 122 [(gogoproto.moretags) = "parent:\"Index\""];
TableData table_data = 131 [(gogoproto.customname) = "TableData", (gogoproto.moretags) = "parent:\"Table, View, Sequence\""];
TablePartitioning table_partitioning = 132 [(gogoproto.customname) = "TablePartitioning", (gogoproto.moretags) = "parent:\"Table\""];

Expand Down Expand Up @@ -604,11 +605,16 @@ message CompositeTypeAttrType {
TypeT embedded_type_t = 2 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
}

// TableZoneConfig temporary place holder just to allow blocking operations.
message TableZoneConfig {
uint32 table_id = 1 [(gogoproto.customname) = "TableID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/catid.DescID"];
}

message IndexZoneConfig {
uint32 table_id = 1 [(gogoproto.customname) = "TableID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/catid.DescID"];
uint32 index_id = 2 [(gogoproto.customname) = "IndexID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/catid.IndexID"];
string partition_name = 3;
}

// DatabaseData models what needs to be GCed when a database is dropped.
message DatabaseData {
uint32 database_id = 1 [(gogoproto.customname) = "DatabaseID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/sem/catid.DescID"];
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/schemachanger/scpb/elements_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/sql/schemachanger/scpb/uml/table.puml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ object TableZoneConfig

TableZoneConfig : TableID

object IndexZoneConfig

IndexZoneConfig : TableID
IndexZoneConfig : IndexID
IndexZoneConfig : PartitionName

object TableData

TableData : TableID
Expand Down Expand Up @@ -405,6 +411,7 @@ Sequence <|-- TableComment
Table <|-- RowLevelTTL
Table <|-- TableZoneConfig
View <|-- TableZoneConfig
Index <|-- IndexZoneConfig
Table <|-- TableData
View <|-- TableData
Sequence <|-- TableData
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schemachanger/scplan/internal/opgen/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"opgen_index_data.go",
"opgen_index_name.go",
"opgen_index_partitioning.go",
"opgen_index_zone_config.go",
"opgen_namespace.go",
"opgen_owner.go",
"opgen_primary_index.go",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package opgen

import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
)

func init() {
opRegistry.register((*scpb.IndexZoneConfig)(nil),
toPublic(
scpb.Status_ABSENT,
to(scpb.Status_PUBLIC,
emit(func(this *scpb.IndexZoneConfig) *scop.NotImplemented {
return &scop.NotImplemented{}
}),
),
),
toAbsent(
scpb.Status_PUBLIC,
to(scpb.Status_ABSENT,
emit(func(this *scpb.IndexZoneConfig) *scop.NotImplementedForPublicObjects {
return notImplementedForPublicObjects(this)
}),
),
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ func isColumnTypeDependent(e scpb.Element) bool {

func isIndexDependent(e scpb.Element) bool {
switch e.(type) {
case *scpb.IndexName, *scpb.IndexComment, *scpb.IndexColumn:
case *scpb.IndexName, *scpb.IndexComment, *scpb.IndexColumn,
*scpb.IndexZoneConfig:
return true
case *scpb.IndexPartitioning, *scpb.SecondaryIndexPartial:
return true
Expand Down
Loading

0 comments on commit adb0556

Please sign in to comment.