Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/backfill: fix bug adding new columns to new index with volatile default #81486

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test",
"//pkg/spanconfig:spanconfig_test",
"//pkg/sql/backfill:backfill_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catformat:catformat_test",
"//pkg/sql/catalog/catpb:catpb_test",
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "backfill",
srcs = [
"backfill.go",
"index_backfiller_cols.go",
"mvcc_index_merger.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/backfill",
Expand Down Expand Up @@ -40,3 +41,14 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
],
)

go_test(
name = "backfill_test",
srcs = ["index_backfiller_cols_test.go"],
embed = [":backfill"],
deps = [
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"@com_github_stretchr_testify//require",
],
)
90 changes: 20 additions & 70 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -414,27 +413,14 @@ type muBoundAccount struct {

// IndexBackfiller is capable of backfilling all the added index.
type IndexBackfiller struct {
indexBackfillerCols

added []catalog.Index
// colIdxMap maps ColumnIDs to indices into desc.Columns and desc.Mutations.
colIdxMap catalog.TableColMap

types []*types.T
rowVals tree.Datums
evalCtx *eval.Context

// cols are all of the writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in
// the descriptor.
cols []catalog.Column

// addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of
// this index which are not computed.
addedCols []catalog.Column

// computedCols are the columns in this index which are computed and do
// not have concrete values in the source index. This is virtual computed
// columns and stored computed columns which are non-public.
computedCols []catalog.Column

// Map of columns which need to be evaluated to their expressions.
colExprs map[descpb.ColumnID]tree.TypedExpr

Expand All @@ -447,10 +433,6 @@ type IndexBackfiller struct {
// backfilled.
indexesToEncode []catalog.Index

// valNeededForCol contains the indexes (into cols) of all columns that we
// need to fetch values for.
valNeededForCol util.FastIntSet

alloc tree.DatumAlloc

// mon is a memory monitor linked with the IndexBackfiller on creation.
Expand Down Expand Up @@ -478,11 +460,14 @@ func (ib *IndexBackfiller) InitForLocalUse(
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
) error {
// Initialize ib.cols and ib.colIdxMap.
ib.initCols(desc)

// Initialize ib.added.
ib.valNeededForCol = ib.initIndexes(desc)
ib.initIndexes(desc)

// Initialize ib.cols and ib.colIdxMap.
if err := ib.initCols(desc); err != nil {
return err
}

predicates, colExprs, referencedColumns, err := constructExprs(
ctx, desc, ib.added, ib.cols, ib.addedCols, ib.computedCols, evalCtx, semaCtx,
Expand Down Expand Up @@ -614,11 +599,14 @@ func (ib *IndexBackfiller) InitForDistributedUse(
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
) error {
// Initialize ib.cols and ib.colIdxMap.
ib.initCols(desc)

// Initialize ib.added.
ib.valNeededForCol = ib.initIndexes(desc)
ib.initIndexes(desc)

// Initialize ib.indexBackfillerCols.
if err := ib.initCols(desc); err != nil {
return err
}

evalCtx := flowCtx.NewEvalCtx()
var predicates map[descpb.IndexID]tree.TypedExpr
Expand Down Expand Up @@ -689,35 +677,17 @@ func (ib *IndexBackfiller) ShrinkBoundAccount(ctx context.Context, shrinkBy int6

// initCols is a helper to populate column metadata of an IndexBackfiller. It
// populates the cols and colIdxMap fields.
func (ib *IndexBackfiller) initCols(desc catalog.TableDescriptor) {
ib.cols = make([]catalog.Column, 0, len(desc.DeletableColumns()))
for _, column := range desc.DeletableColumns() {
if column.Public() {
if column.IsComputed() && column.IsVirtual() {
ib.computedCols = append(ib.computedCols, column)
}
} else if column.Adding() && column.WriteAndDeleteOnly() {
// If there are ongoing mutations, add columns that are being added and in
// the DELETE_AND_WRITE_ONLY state.
if column.IsComputed() {
ib.computedCols = append(ib.computedCols, column)
} else {
ib.addedCols = append(ib.addedCols, column)
}
} else {
continue
}
// Create a map of each column's ID to its ordinal.
ib.colIdxMap.Set(column.GetID(), len(ib.cols))
ib.cols = append(ib.cols, column)
}
func (ib *IndexBackfiller) initCols(desc catalog.TableDescriptor) (err error) {
ib.indexBackfillerCols, err = makeIndexBackfillColumns(
desc.DeletableColumns(), desc.GetPrimaryIndex(), ib.added,
)
return err
}

// initIndexes is a helper to populate index metadata of an IndexBackfiller. It
// populates the added field. It returns a set of column ordinals that must be
// fetched in order to backfill the added indexes.
func (ib *IndexBackfiller) initIndexes(desc catalog.TableDescriptor) util.FastIntSet {
var valNeededForCol util.FastIntSet
func (ib *IndexBackfiller) initIndexes(desc catalog.TableDescriptor) {
mutations := desc.AllMutations()
mutationID := mutations[0].MutationID()

Expand All @@ -729,29 +699,9 @@ func (ib *IndexBackfiller) initIndexes(desc catalog.TableDescriptor) util.FastIn
}
if IndexMutationFilter(m) {
idx := m.AsIndex()
colIDs := idx.CollectKeyColumnIDs()
if idx.GetEncodingType() == descpb.PrimaryIndexEncoding {
for _, col := range ib.cols {
if !col.IsVirtual() {
colIDs.Add(col.GetID())
}
}
} else {
colIDs.UnionWith(idx.CollectSecondaryStoredColumnIDs())
colIDs.UnionWith(idx.CollectKeySuffixColumnIDs())
}

ib.added = append(ib.added, idx)
for i := range ib.cols {
id := ib.cols[i].GetID()
if colIDs.Contains(id) && i < len(desc.PublicColumns()) && !ib.cols[i].IsVirtual() {
valNeededForCol.Add(i)
}
}
}
}

return valNeededForCol
}

// init completes the initialization of an IndexBackfiller.
Expand Down
177 changes: 177 additions & 0 deletions pkg/sql/backfill/index_backfiller_cols.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package backfill

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

// indexBackfillerCols holds the interned set of columns used by the
// indexBackfiller and the information about which columns will need to
// be evaluated during the backfill.
type indexBackfillerCols struct {
// colIdxMap maps ColumnIDs to indices into desc.Columns and desc.Mutations.
colIdxMap catalog.TableColMap

// cols are all writable (PUBLIC and DELETE_AND_WRITE_ONLY) columns in
// the descriptor.
cols []catalog.Column

// addedCols are the columns in DELETE_AND_WRITE_ONLY being added as part of
// this index which are not computed. The definition of being added is that
// the index being backfilled is a new primary index and the columns do not
// exist in the currently public primary index. Note that this should never
// be populated when running using the legacy schema changer; the legacy
// schema changer rewrites primary indexes in-place using the column
// backfiller before creating any secondary indexes.
addedCols []catalog.Column

// computedCols are the columns in this index which are computed and do
// not have concrete values in the source index. This is virtual computed
// columns and stored computed columns which are non-public and not part
// of the currently public primary index.
computedCols []catalog.Column

// valNeededForCol contains the indexes (into cols) of all columns that we
// need to fetch values for.
valNeededForCol util.FastIntSet
}

// makeIndexBackfillColumns computes the set of writable columns and
// classifies the two subsets of that set which will need to be computed
// during execution of the backfill. Note that all columns will be included
// in the complete cols slice regardless of whether they appear in
// addedIndexes. Appearance in addedIndexes will affect valNeededForColumn.
// Note that the valNeededForCol set will not be populated with
// requirements due to references in expressions. That needs to be added after
// constructing this information.
func makeIndexBackfillColumns(
deletableColumns []catalog.Column, sourcePrimaryIndex catalog.Index, addedIndexes []catalog.Index,
) (indexBackfillerCols, error) {

// We will need to evaluate default or computed expressions for
// physical columns being added only if this is a primary index backfill
// and only if the added columns are not in the existing public primary
// index. We're going to assume that if there is a new column being added
// to a new primary index that it doesn't appear in any secondary index
// being simultaneously backfilled, because if that column were needed,
// and we're adding it now, we'd have a problem.

var ib indexBackfillerCols
ib.cols = make([]catalog.Column, 0, len(deletableColumns))

var computedVirtual catalog.TableColSet
for _, column := range deletableColumns {
if !column.Public() &&
!(column.Adding() && column.WriteAndDeleteOnly()) {
continue
}
if column.IsComputed() && column.IsVirtual() {
computedVirtual.Add(column.GetID())
ib.computedCols = append(ib.computedCols, column)
}
// Create a map of each column's ID to its ordinal.
ib.colIdxMap.Set(column.GetID(), len(ib.cols))
ib.cols = append(ib.cols, column)
}

// Find the adding columns which are being added to new primary indexes.
primaryColumns := indexColumns(sourcePrimaryIndex)
var addedDefaultOrComputed catalog.TableColSet
for _, idx := range addedIndexes {
if idx.GetEncodingType() != descpb.PrimaryIndexEncoding {
if !indexColumns(idx).Difference(computedVirtual).SubsetOf(primaryColumns) {
return indexBackfillerCols{}, errors.AssertionFailedf(
"secondary index for backfill contains physical column not present in " +
"source primary index",
)
}
}
addedDefaultOrComputed.UnionWith(
indexColumns(idx).Difference(computedVirtual).Difference(primaryColumns),
)
}

// Account for these new columns being initially backfilled into new primary
// indexes.
for _, colID := range addedDefaultOrComputed.Ordered() {
ord, ok := ib.colIdxMap.Get(colID)
if !ok {
return indexBackfillerCols{}, errors.AssertionFailedf(
"index being backfilled contains non-writable or dropping column")
}
col := ib.cols[ord]
if col.IsComputed() {
ib.computedCols = append(ib.computedCols, col)
} else {
ib.addedCols = append(ib.addedCols, col)
}
}

ib.valNeededForCol = makeInitialValNeededForCol(ib, addedIndexes)
return ib, nil
}

// makeInitialValNeededForCol computes the initial set of columns whose values
// are known to be needed based solely on index membership. More may be needed
// because of references in expressions.
func makeInitialValNeededForCol(
ib indexBackfillerCols, addedIndexes []catalog.Index,
) (valNeededForCol util.FastIntSet) {
// Any columns we're going to eval, we don't need values for ahead of time.
toEval := func() catalog.TableColSet {
columnIDs := func(columns []catalog.Column) (s catalog.TableColSet) {
for _, c := range columns {
s.Add(c.GetID())
}
return s
}

// The set of columns we'll evaluate are the addedColumns and the computed
// columns. We don't need values for these columns.
toEval := columnIDs(ib.addedCols)
toEval.UnionWith(columnIDs(ib.computedCols))
return toEval
}()

for _, idx := range addedIndexes {
colIDs := idx.CollectKeyColumnIDs()
if idx.GetEncodingType() == descpb.PrimaryIndexEncoding {
for _, col := range ib.cols {
if !col.IsVirtual() {
colIDs.Add(col.GetID())
}
}
} else {
colIDs.UnionWith(idx.CollectSecondaryStoredColumnIDs())
colIDs.UnionWith(idx.CollectKeySuffixColumnIDs())
}

for i := range ib.cols {
if id := ib.cols[i].GetID(); colIDs.Contains(id) && !toEval.Contains(id) {
valNeededForCol.Add(i)
}
}
}
return valNeededForCol
}

// indexColumns computes the complete set of column stored in an index.
func indexColumns(idx catalog.Index) (s catalog.TableColSet) {
s.UnionWith(idx.CollectKeyColumnIDs())
s.UnionWith(idx.CollectKeySuffixColumnIDs())
s.UnionWith(idx.CollectPrimaryStoredColumnIDs())
s.UnionWith(idx.CollectSecondaryStoredColumnIDs())
return s
}
Loading