Skip to content

Commit

Permalink
Merge #34263
Browse files Browse the repository at this point in the history
34263: sql: verify index before declaring it public r=vivekmenezes a=vivekmenezes

A new index verification step is added after an index backfill
to verify that the index is indeed valid. This is good practice,
but also necessary with the introduction of the new bulk index
backfill which doesn't have the capability to detect duplicate
index entries.

related to #12424

Release note: None

Co-authored-by: Vivek Menezes <[email protected]>
  • Loading branch information
craig[bot] and vivekmenezes committed Feb 7, 2019
2 parents f40f6d2 + ef3270f commit ee130c0
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 15 deletions.
170 changes: 168 additions & 2 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sql

import (
"context"
"fmt"
"sort"
"time"

Expand All @@ -26,11 +27,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -467,6 +471,165 @@ func (sc *SchemaChanger) distBackfill(
return nil
}

// validate the new indexes being added
func (sc *SchemaChanger) validateIndexes(
ctx context.Context,
evalCtx *extendedEvalContext,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
) error {
if testDisableTableLeases {
return nil
}
readAsOf := sc.clock.Now()
return sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, readAsOf)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID)
if err != nil {
return err
}

if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

grp := ctxgroup.WithContext(ctx)

var tableRowCount int64
// Close when table count is ready.
tableCountReady := make(chan struct{})
// Notify of count completion even when an error is encountered.
// Provision such that a sender will never block.
countDone := make(chan struct{}, len(tableDesc.Mutations)+1)
numIndexCounts := 0
// Compute the size of each index.
for _, m := range tableDesc.Mutations {
if sc.mutationID != m.MutationID {
break
}
idx := m.GetIndex()
// An inverted index doesn't matchup in length.
if idx == nil ||
idx.Type == sqlbase.IndexDescriptor_INVERTED ||
m.Direction == sqlbase.DescriptorMutation_DROP {
continue
}

numIndexCounts++
grp.GoCtx(func(ctx context.Context) error {
defer func() { countDone <- struct{}{} }()
start := timeutil.Now()
// Make the mutations public in a private copy of the descriptor
// and add it to the TableCollection, so that we can use SQL below to perform
// the validation. We wouldn't have needed to do this if we could have
// updated the descriptor and run validation in the same transaction. However,
// our current system is incapable of running long running schema changes
// (the validation can take many minutes). So we pretend that the schema
// has been updated and actually update it in a separate transaction that
// follows this one.
desc, err := sqlbase.NewImmutableTableDescriptor(*tableDesc).MakeFirstMutationPublic()
if err != nil {
return err
}
tc := &TableCollection{leaseMgr: sc.leaseMgr}
// pretend that the schema has been modified.
if err := tc.addUncommittedTable(*desc); err != nil {
return err
}

// Create a new eval context only because the eval context cannot be shared across many
// goroutines.
newEvalCtx := createSchemaChangeEvalCtx(ctx, readAsOf, evalCtx.Tracing, sc.ieFactory)
// TODO(vivek): This is not a great API. Leaving #34304 open.
ie := newEvalCtx.InternalExecutor.(*SessionBoundInternalExecutor)
ie.impl.tcModifier = tc
defer func() {
ie.impl.tcModifier = nil
}()

row, err := newEvalCtx.InternalExecutor.QueryRow(ctx, "verify-idx-count", txn,
fmt.Sprintf(`SELECT count(*) FROM [%d AS t]@[%d]`, tableDesc.ID, idx.ID))
if err != nil {
return err
}
idxLen := int64(tree.MustBeDInt(row[0]))

log.Infof(ctx, "index %s/%s row count = %d, took %s",
tableDesc.Name, idx.Name, idxLen, timeutil.Since(start))

select {
case <-tableCountReady:
if idxLen != tableRowCount {
// TODO(vivek): find the offending row and include it in the error.
return pgerror.NewErrorf(
pgerror.CodeUniqueViolationError,
"index %q uniqueness violation: %d entries, expected %d",
idx.Name, idxLen, tableRowCount,
)
}

case <-ctx.Done():
return ctx.Err()
}

return nil
})
}

if numIndexCounts > 0 {
grp.GoCtx(func(ctx context.Context) error {
defer close(tableCountReady)
defer func() { countDone <- struct{}{} }()
var tableRowCountTime time.Duration
start := timeutil.Now()
// Count the number of rows in the table.
cnt, err := evalCtx.InternalExecutor.QueryRow(ctx, "VERIFY INDEX", txn,
fmt.Sprintf(`SELECT count(1) FROM [%d AS t]`, tableDesc.ID))
if err != nil {
return err
}
tableRowCount = int64(tree.MustBeDInt(cnt[0]))
tableRowCountTime = timeutil.Since(start)
log.Infof(ctx, "table %s row count = %d, took %s",
tableDesc.Name, tableRowCount, tableRowCountTime)
return nil
})

// Periodic schema change lease extension.
grp.GoCtx(func(ctx context.Context) error {
count := numIndexCounts + 1
refreshTimer := timeutil.NewTimer()
defer refreshTimer.Stop()
refreshTimer.Reset(checkpointInterval)
for {
select {
case <-countDone:
count--
if count == 0 {
// Stop.
return nil
}

case <-refreshTimer.C:
refreshTimer.Read = true
refreshTimer.Reset(checkpointInterval)
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

case <-ctx.Done():
return ctx.Err()
}
}
})

if err := grp.Wait(); err != nil {
return err
}
}
return nil
})
}

func (sc *SchemaChanger) backfillIndexes(
ctx context.Context,
evalCtx *extendedEvalContext,
Expand All @@ -483,9 +646,12 @@ func (sc *SchemaChanger) backfillIndexes(
chunkSize = indexBulkBackfillChunkSize.Get(&sc.settings.SV)
}

return sc.distBackfill(
if err := sc.distBackfill(
ctx, evalCtx, lease, version, indexBackfill, chunkSize,
backfill.IndexMutationFilter)
backfill.IndexMutationFilter); err != nil {
return err
}
return sc.validateIndexes(ctx, evalCtx, lease)
}

func (sc *SchemaChanger) truncateAndBackfillColumns(
Expand Down
15 changes: 3 additions & 12 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -279,17 +278,9 @@ func ConvertBackfillError(
// information useful in printing a sensible error. However
// ConvertBatchError() will only work correctly if the schema elements
// are "live" in the tableDesc.
desc := sqlbase.NewMutableExistingTableDescriptor(*protoutil.Clone(tableDesc.TableDesc()).(*sqlbase.TableDescriptor))
mutationID := desc.Mutations[0].MutationID
for _, mutation := range desc.Mutations {
if mutation.MutationID != mutationID {
// Mutations are applied in a FIFO order. Only apply the first set
// of mutations if they have the mutation ID we're looking for.
break
}
if err := desc.MakeMutationComplete(mutation); err != nil {
return errors.Wrap(err, "backfill error")
}
desc, err := tableDesc.MakeFirstMutationPublic()
if err != nil {
return err
}
return row.ConvertBatchError(ctx, sqlbase.NewImmutableTableDescriptor(*desc.TableDesc()), b)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ func (s *Server) newConnExecutorWithTxn(
memMetrics MemoryMetrics,
srvMetrics *Metrics,
txn *client.Txn,
tcModifier tableCollectionModifier,
) (*connExecutor, error) {
ex, err := s.newConnExecutor(ctx, sargs, stmtBuf, clientComm, memMetrics, srvMetrics)
if err != nil {
Expand All @@ -633,6 +634,13 @@ func (s *Server) newConnExecutorWithTxn(
tree.ReadWrite,
txn,
ex.transitionCtx)

// Modify the TableCollection to match the parent executor's TableCollection.
// This allows the InternalExecutor to see schema changes made by the
// parent executor.
if tcModifier != nil {
tcModifier.copyModifiedSchema(&ex.extraTxnState.tables)
}
return ex, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ func (scc *schemaChangerCollection) execSchemaChanges(
sc.testingKnobs = cfg.SchemaChangerTestingKnobs
sc.distSQLPlanner = cfg.DistSQLPlanner
sc.settings = cfg.Settings
sc.ieFactory = ieFactory
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
evalCtx := createSchemaChangeEvalCtx(ctx, cfg.Clock.Now(), tracing, ieFactory)
if err := sc.exec(ctx, true /* inSession */, &evalCtx); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ type internalExecutorImpl struct {
// parent session state if need be, but then we'd need to share a
// sessionDataMutator.
sessionData *sessiondata.SessionData

// The internal executor uses its own TableCollection. A TableCollection
// is a schema cache for each transaction and contains data like the schema
// modified by a transaction. Occasionally an internal executor is called
// within the context of a transaction that has modified the schema, the
// internal executor should see the modified schema. This interface allows
// the internal executor to modify its TableCollection to match the
// TableCollection of the parent executor.
tcModifier tableCollectionModifier
}

// MakeInternalExecutor creates an InternalExecutor.
Expand Down Expand Up @@ -192,7 +201,8 @@ func (ie *internalExecutorImpl) initConnEx(
ie.mon,
ie.memMetrics,
&ie.s.InternalMetrics,
txn)
txn,
ie.tcModifier)
}
if err != nil {
return nil, nil, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -115,6 +116,7 @@ type SchemaChanger struct {
clock *hlc.Clock
settings *cluster.Settings
execCfg *ExecutorConfig
ieFactory sqlutil.SessionBoundInternalExecutorFactory
}

// NewSchemaChangerForTesting only for tests.
Expand Down Expand Up @@ -1641,6 +1643,7 @@ func (s *SchemaChangeManager) Start(stopper *stop.Stopper) {
rangeDescriptorCache: s.execCfg.RangeDescriptorCache,
clock: s.execCfg.Clock,
settings: s.execCfg.Settings,
ieFactory: s.ieFactory,
}

execAfter := timeutil.Now().Add(delay)
Expand Down Expand Up @@ -1810,6 +1813,7 @@ func createSchemaChangeEvalCtx(
DataConversion: sessiondata.DataConversionConfig{
Location: dummyLocation,
},
User: security.NodeUser,
}

evalCtx := extendedEvalContext{
Expand Down
Loading

0 comments on commit ee130c0

Please sign in to comment.