Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[custom-build] sql/rowexec: subject column backfills to admission control #79117

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55
github.com/cockroachdb/gostdlib v1.13.0
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f
github.com/cockroachdb/pebble v0.0.0-20220322140431-c50a066abbd3
github.com/cockroachdb/pebble v0.0.0-20220405202737-d167870d7bcf
github.com/cockroachdb/redact v1.1.3
github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd
github.com/cockroachdb/stress v0.0.0-20170808184505-29b5d31b4c3a
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ github.com/cockroachdb/gostdlib v1.13.0/go.mod h1:eXX95p9QDrYwJfJ6AgeN9QnRa/lqqi
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f/go.mod h1:i/u985jwjWRlyHXQbwatDASoW0RMlZ/3i9yJHE2xLkI=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0njg5jJ1DdKCFPdMBrp/mdZfCpa5h+WM74=
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/pebble v0.0.0-20220322140431-c50a066abbd3 h1:CBC5G9uatpCr5/CzU3ZDrFGxmqB/vrH2QPWSmYUBNto=
github.com/cockroachdb/pebble v0.0.0-20220322140431-c50a066abbd3/go.mod h1:JXfQr3d+XO4bL1pxGwKKo09xylQSdZ/mpZ9b2wfVcPs=
github.com/cockroachdb/pebble v0.0.0-20220405202737-d167870d7bcf h1:tpUPUwhunTeNb/TYi/SLuIJnDEWBPOMPB/SiHo9t2F8=
github.com/cockroachdb/pebble v0.0.0-20220405202737-d167870d7bcf/go.mod h1:JXfQr3d+XO4bL1pxGwKKo09xylQSdZ/mpZ9b2wfVcPs=
github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,10 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

if err := r.checkVirtualConstraints(ctx, p.ExecCfg(), r.job); err != nil {
return err
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the typeDescs has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
Expand Down Expand Up @@ -2304,6 +2308,35 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo
})
}

// checkVirtualConstraints checks constraints that are enforced via runtime
// checks, such as uniqueness checks that are not directly backed by an index.
func (*importResumer) checkVirtualConstraints(
ctx context.Context, execCfg *sql.ExecutorConfig, job *jobs.Job,
) error {
for _, tbl := range job.Details().(jobspb.ImportDetails).Tables {
desc := tabledesc.NewBuilder(tbl.Desc).BuildExistingMutableTable()
desc.SetPublic()

if sql.HasVirtualUniqueConstraints(desc) {
if err := job.RunningStatus(ctx, nil /* txn */, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
return jobs.RunningStatus(fmt.Sprintf("re-validating %s", desc.GetName())), nil
}); err != nil {
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(job.ID()))
}
}

if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV()))
return ie.WithSyntheticDescriptors([]catalog.Descriptor{desc}, func() error {
return sql.RevalidateUniqueConstraintsInTable(ctx, txn, ie, desc)
})
}); err != nil {
return err
}
}
return nil
}

// checkForUDTModification checks whether any of the types referenced by the
// table being imported into have been modified incompatibly since they were
// read during import planning. If they have, it may be unsafe to continue
Expand Down
19 changes: 15 additions & 4 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6442,7 +6442,7 @@ func TestImportMultiRegion(t *testing.T) {

baseDir := filepath.Join("testdata")
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 2 /* numServers */, base.TestingKnobs{}, multiregionccltestutils.WithBaseDirectory(baseDir),
t, 3 /* numServers */, base.TestingKnobs{}, multiregionccltestutils.WithBaseDirectory(baseDir),
)
defer cleanup()

Expand All @@ -6468,7 +6468,7 @@ func TestImportMultiRegion(t *testing.T) {

// Create the databases
tdb.Exec(t, `CREATE DATABASE foo`)
tdb.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "us-east1"`)
tdb.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "us-east1" REGIONS "us-east1", "us-east2"`)

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

Expand Down Expand Up @@ -6585,6 +6585,17 @@ DROP VIEW IF EXISTS v`,
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-dupes",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY) LOCALITY REGIONAL BY ROW;" +
"INSERT INTO mr_regional_by_row (i, crdb_region) VALUES (1, 'us-east2')",
sql: "IMPORT INTO mr_regional_by_row (i, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,us-east1\n",
errString: `failed to validate unique constraint`,
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-concurrent-table-add",
db: "multi_region",
Expand All @@ -6601,7 +6612,7 @@ DROP VIEW IF EXISTS v`,
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
during: `ALTER DATABASE multi_region ADD REGION "us-east2"`,
during: `ALTER DATABASE multi_region ADD REGION "us-east3"`,
errString: `type descriptor "crdb_internal_region" \(54\) has been ` +
`modified, potentially incompatibly, since import planning; ` +
`aborting to avoid possible corruption`,
Expand All @@ -6618,7 +6629,7 @@ CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s typ, b bytea) LOCALITY RE
`,
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
during: `ALTER TYPE typ ADD VALUE 'b'`,
errString: `type descriptor "typ" \(67\) has been ` +
errString: `type descriptor "typ" \(\d\d\) has been ` +
`modified, potentially incompatibly, since import planning; ` +
`aborting to avoid possible corruption`,
args: []interface{}{srv.URL},
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -790,6 +791,12 @@ func (ie *wrappedInternalExecutor) QueryIteratorEx(
return ie.wrapped.QueryIteratorEx(ctx, opName, txn, session, stmt, qargs...)
}

func (ie *wrappedInternalExecutor) WithSyntheticDescriptors(
descs []catalog.Descriptor, run func() error,
) error {
panic("not implemented")
}

func (ie *wrappedInternalExecutor) getErrFunc() func(statement string) error {
ie.mu.RLock()
defer ie.mu.RUnlock()
Expand Down
37 changes: 27 additions & 10 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -381,7 +382,7 @@ func (p *planner) RevalidateUniqueConstraintsInCurrentDB(ctx context.Context) er
}

for _, tableDesc := range tableDescs {
if err = p.revalidateUniqueConstraintsInTable(ctx, tableDesc); err != nil {
if err = RevalidateUniqueConstraintsInTable(ctx, p.Txn(), p.ExecCfg().InternalExecutor, tableDesc); err != nil {
return err
}
}
Expand All @@ -402,7 +403,7 @@ func (p *planner) RevalidateUniqueConstraintsInTable(ctx context.Context, tableI
if err != nil {
return err
}
return p.revalidateUniqueConstraintsInTable(ctx, tableDesc)
return RevalidateUniqueConstraintsInTable(ctx, p.Txn(), p.ExecCfg().InternalExecutor, tableDesc)
}

// RevalidateUniqueConstraint verifies that the given unique constraint on the
Expand Down Expand Up @@ -465,16 +466,32 @@ func (p *planner) RevalidateUniqueConstraint(
return errors.Newf("unique constraint %s does not exist", constraintName)
}

// revalidateUniqueConstraintsInTable verifies that all unique constraints
// HasVirtualUniqueConstraints returns true if the table has one or more
// constraints that are validated by RevalidateUniqueConstraintsInTable.
func HasVirtualUniqueConstraints(tableDesc catalog.TableDescriptor) bool {
for _, index := range tableDesc.ActiveIndexes() {
if index.IsUnique() && index.GetPartitioning().NumImplicitColumns() > 0 {
return true
}
}
for _, uc := range tableDesc.GetUniqueWithoutIndexConstraints() {
if uc.Validity == descpb.ConstraintValidity_Validated {
return true
}
}
return false
}

// RevalidateUniqueConstraintsInTable verifies that all unique constraints
// defined on the given table are valid. In other words, it verifies that all
// rows in the table have unique values for every unique constraint defined on
// the table.
//
// Note that we only need to validate UNIQUE constraints that are not already
// enforced by an index. This includes implicitly partitioned UNIQUE indexes
// and UNIQUE WITHOUT INDEX constraints.
func (p *planner) revalidateUniqueConstraintsInTable(
ctx context.Context, tableDesc catalog.TableDescriptor,
func RevalidateUniqueConstraintsInTable(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, tableDesc catalog.TableDescriptor,
) error {
// Check implicitly partitioned UNIQUE indexes.
for _, index := range tableDesc.ActiveIndexes() {
Expand All @@ -485,8 +502,8 @@ func (p *planner) revalidateUniqueConstraintsInTable(
index.GetName(),
index.IndexDesc().KeyColumnIDs[index.GetPartitioning().NumImplicitColumns():],
index.GetPredicate(),
p.ExecCfg().InternalExecutor,
p.Txn(),
ie,
txn,
true, /* preExisting */
); err != nil {
log.Errorf(ctx, "validation of unique constraints failed for table %s: %s", tableDesc.GetName(), err)
Expand All @@ -504,8 +521,8 @@ func (p *planner) revalidateUniqueConstraintsInTable(
uc.Name,
uc.ColumnIDs,
uc.Predicate,
p.ExecCfg().InternalExecutor,
p.Txn(),
ie,
txn,
true, /* preExisting */
); err != nil {
log.Errorf(ctx, "validation of unique constraints failed for table %s: %s", tableDesc.GetName(), err)
Expand All @@ -532,7 +549,7 @@ func validateUniqueConstraint(
constraintName string,
columnIDs []descpb.ColumnID,
pred string,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
preExisting bool,
) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlutil/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -142,6 +143,9 @@ type InternalExecutor interface {
stmt string,
qargs ...interface{},
) (InternalRows, error)

// WithSyntheticDescriptors sets synthetic descriptors. See implementation.
WithSyntheticDescriptors(descs []catalog.Descriptor, run func() error) error
}

// InternalRows is an iterator interface that's exposed by the internal
Expand Down
43 changes: 33 additions & 10 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,8 +1417,9 @@ type ioLoadListener struct {
l0Bytes int64
l0AddedBytes uint64
// Exponentially smoothed per interval values.
smoothedBytesRemoved int64
smoothedNumAdmit float64
smoothedBytesRemoved int64
smoothedNumAdmit float64
smoothedBytesAddedPerWork float64

// totalTokens represents the tokens to give out until the next call to
// adjustTokens. They are given out with smoothing -- tokensAllocated
Expand Down Expand Up @@ -1561,19 +1562,41 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
// situation.
doLog = false
}
// Attribute the bytesAdded equally to all the admitted work.
// INVARIANT: perWork >= 0
if perWork := float64(bytesAdded) / float64(admitted); perWork > 0 && admitted > 1 {
// Track an exponentially smoothed estimate of bytes added per work when
// there was some work actually admitted. Note we treat having admitted
// one item as the same as having admitted zero both because we clamp
// admitted to 1 and if we only admitted one thing, do we really want to
// use that for our estimate? The conjunction includes perWork > 0 (and
// not just admitted), since we have seen situation where perWork=0 and
// admitted > 1. This can happen since the stats from Pebble (bytesAdded)
// and those from the requester (admitted) are not synchronized -- the
// bytes are written to Pebble after admission, so there is a lag from
// when the counter is incremented in the latter and the bytes are
// incremented in the former.
if io.smoothedBytesAddedPerWork == 0 {
io.smoothedBytesAddedPerWork = perWork
} else {
io.smoothedBytesAddedPerWork = alpha*perWork +
(1-alpha)*io.smoothedBytesAddedPerWork
}
}

// We constrain admission if the store if over the threshold.
if m.Levels[0].NumFiles > L0FileCountOverloadThreshold.Get(&io.settings.SV) ||
m.Levels[0].Sublevels > int32(L0SubLevelCountOverloadThreshold.Get(&io.settings.SV)) {
// Attribute the bytesAdded equally to all the admitted work.
// INVARIANT: bytesAddedPerWork >= 0
bytesAddedPerWork := float64(bytesAdded) / float64(admitted)
if bytesAddedPerWork == 0 {
// We are here because bytesAdded was 0. This will be very rare.
bytesAddedPerWork = 1

smoothedBytesAddedPerWork := io.smoothedBytesAddedPerWork
if io.smoothedBytesAddedPerWork < 1 {
// Rare case where we've never seen any work items or somehow the
// estimate is less than 1. This is important to avoid overflow.
smoothedBytesAddedPerWork = 1
}
// Don't admit more work than we can remove via compactions. numAdmit
// tracks our goal for admission.
numAdmit := float64(io.smoothedBytesRemoved) / bytesAddedPerWork
numAdmit := float64(io.smoothedBytesRemoved) / smoothedBytesAddedPerWork
// Scale down since we want to get under the thresholds over time. This
// scaling could be adjusted based on how much above the threshold we are,
// but for now we just use a constant.
Expand All @@ -1596,7 +1619,7 @@ func (io *ioLoadListener) adjustTokens(m pebble.Metrics) {
}
} else {
// Under the threshold. Maintain a smoothedNumAdmit so that it is not 0
// when we first go over the threshold. Instead use what we actually
// when we first go over the threshold. Instead, use what we actually
// admitted.
io.smoothedNumAdmit = alpha*float64(admitted) + (1-alpha)*io.smoothedNumAdmit
io.totalTokens = unlimitedTokens
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,10 @@ func TestIOLoadListener(t *testing.T) {
// Do the ticks until just before next adjustment.
var buf strings.Builder
fmt.Fprintf(&buf, "admitted: %d, bytes: %d, added-bytes: %d,\nsmoothed-removed: %d, "+
"smoothed-admit: %d,\ntokens: %s, tokens-allocated: %s\n", ioll.admittedCount,
"smoothed-admit: %d, smoothed-bytes-added-per-work: %d,\ntokens: %s, tokens-allocated: %s\n", ioll.admittedCount,
ioll.l0Bytes, ioll.l0AddedBytes, ioll.smoothedBytesRemoved,
int64(ioll.smoothedNumAdmit), tokensForIntervalToString(ioll.totalTokens),
int64(ioll.smoothedNumAdmit), int64(ioll.smoothedBytesAddedPerWork),
tokensForIntervalToString(ioll.totalTokens),
tokensFor1sToString(ioll.tokensAllocated))
for i := 0; i < adjustmentInterval; i++ {
ioll.allocateTokensTick()
Expand Down
Loading