Skip to content

Commit

Permalink
Merge #79115
Browse files Browse the repository at this point in the history
79115: sql/rowexec: subject column backfills to admission control r=ajwerner,sumeerbhola a=erikgrinaker

**kv: add txn helpers that configure admission control**

This patch adds the functions `DB.TxnWithAdmissionControl()` and
`kv.NewTxnWithAdmissionControl()`, which allow the caller to set the
admission control source and priority. The default variants of these
functions use source `OTHER` which bypasses admission control.

Release note: None

**sql/rowexec: subject column backfills to admission control**

This patch subjects column backfills to admission control, using normal
priority.

Release note (bug fix): `ALTER TABLE [ADD|DROP] COLUMN` are now subject
to admission control, which will prevent these operations from
overloading the storage engine.

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Apr 1, 2022
2 parents fa02911 + 1f47599 commit 588c03d
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 44 deletions.
1 change: 0 additions & 1 deletion pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ go_library(
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,10 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// from recoverable internal errors, and is automatically committed
// otherwise. The retryable function should have no side effects which could
// cause problems in the event it must be run more than once.
//
// This transaction will not be subject to admission control. To enable this,
// use TxnWithAdmissionControl.
//
// For example:
// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// if kv, err := txn.Get(ctx, key); err != nil {
Expand All @@ -877,19 +881,32 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// // ...
// return nil
// })
//
// Note that once the transaction encounters a retryable error, the txn object
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
return db.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_OTHER, admission.NormalPri, retryable)
}

// TxnWithAdmissionControl is like Txn, but uses a configurable admission
// control source and priority.
func (db *DB) TxnWithAdmissionControl(
ctx context.Context,
source roachpb.AdmissionHeader_Source,
priority admission.WorkPriority,
retryable func(context.Context, *Txn) error,
) error {
// TODO(radu): we should open a tracing Span here (we need to figure out how
// to use the correct tracer).

// Observed timestamps don't work with multi-tenancy. See:
//
// https://github.com/cockroachdb/cockroach/issues/48008
nodeID, _ := db.ctx.NodeID.OptionalNodeID() // zero if not available
txn := NewTxn(ctx, db, nodeID)
txn := NewTxnWithAdmissionControl(ctx, db, nodeID, source, priority)
txn.SetDebugName("unnamed")
return runTxn(ctx, txn, retryable)
}
Expand Down
40 changes: 24 additions & 16 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -119,6 +118,19 @@ type Txn struct {
//
// See also db.NewTxn().
func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
return NewTxnWithAdmissionControl(
ctx, db, gatewayNodeID, roachpb.AdmissionHeader_OTHER, admission.NormalPri)
}

// NewTxnWithAdmissionControl creates a new transaction with the specified
// admission control source and priority. See NewTxn() for details.
func NewTxnWithAdmissionControl(
ctx context.Context,
db *DB,
gatewayNodeID roachpb.NodeID,
source roachpb.AdmissionHeader_Source,
priority admission.WorkPriority,
) *Txn {
if db == nil {
panic(errors.WithContextTags(
errors.AssertionFailedf("attempting to create txn with nil db"), ctx))
Expand All @@ -133,8 +145,13 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
db.clock.MaxOffset().Nanoseconds(),
int32(db.ctx.NodeID.SQLInstanceID()),
)

return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
txn := NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
txn.admissionHeader = roachpb.AdmissionHeader{
CreateTime: db.clock.PhysicalNow(),
Priority: int32(priority),
Source: source,
}
return txn
}

// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL. Note
Expand All @@ -148,12 +165,8 @@ func NewTxnWithSteppingEnabled(
gatewayNodeID roachpb.NodeID,
qualityOfService sessiondatapb.QoSLevel,
) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
txn.admissionHeader = roachpb.AdmissionHeader{
Priority: int32(qualityOfService),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
}
txn := NewTxnWithAdmissionControl(ctx, db, gatewayNodeID,
roachpb.AdmissionHeader_FROM_SQL, admission.WorkPriority(qualityOfService))
_ = txn.ConfigureStepping(ctx, SteppingEnabled)
return txn
}
Expand All @@ -165,13 +178,8 @@ func NewTxnWithSteppingEnabled(
// transaction to undergo admission control. See AdmissionHeader_Source for more
// details.
func NewTxnRootKV(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
txn.admissionHeader = roachpb.AdmissionHeader{
Priority: int32(admission.NormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
}
return txn
return NewTxnWithAdmissionControl(
ctx, db, gatewayNodeID, roachpb.AdmissionHeader_ROOT_KV, admission.NormalPri)
}

// NewTxnFromProto is like NewTxn but assumes the Transaction object is already initialized.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/sql/stats",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/cancelchecker",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
55 changes: 29 additions & 26 deletions pkg/sql/rowexec/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand Down Expand Up @@ -105,34 +106,36 @@ func (cb *columnBackfiller) runChunk(
) (roachpb.Key, error) {
var key roachpb.Key
var commitWaitFn func(context.Context) error
err := cb.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk != nil {
if err := cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk(sp); err != nil {
return err
err := cb.flowCtx.Cfg.DB.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_FROM_SQL, admission.NormalPri,
func(ctx context.Context, txn *kv.Txn) error {
if cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk != nil {
if err := cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk(sp); err != nil {
return err
}
}
if cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk != nil {
defer cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk()
}
}
if cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk != nil {
defer cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk()
}

// Defer the commit-wait operation so that we can coalesce this wait
// across all batches. This dramatically reduces the total time we spend
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
)
return err
})
// Defer the commit-wait operation so that we can coalesce this wait
// across all batches. This dramatically reduces the total time we spend
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
)
return err
})
if err == nil {
cb.commitWaitFns = append(cb.commitWaitFns, commitWaitFn)
maxCommitWaitFns := int(backfillerMaxCommitWaitFns.Get(&cb.flowCtx.Cfg.Settings.SV))
Expand Down

0 comments on commit 588c03d

Please sign in to comment.