Skip to content

Commit

Permalink
Merge #89709
Browse files Browse the repository at this point in the history
89709: kv,rangefeed: integrate catchup scans with elastic cpu r=irfansharif a=irfansharif

Part of #65957. First commit is from #89656.

Changefeed backfills, given their scan-heavy nature, can be fairly
CPU-intensive. In #89656 we introduced a roachtest demonstrating the
latency impact backfills can have on a moderately CPU-saturated cluster.
Similar to what we saw for backups, this CPU heavy nature can elevate Go
scheduling latencies which in turn translates to foreground latency
impact. This commit integrates rangefeed catchup scan with the elastic
CPU limiter we introduced in #86638; this is one of two optional halves
of changefeed backfills. The second half is the initial scan -- scan
requests issued over some keyspan as of some timestamp. For that we
simply rely on the existing slots mechanism but now setting a lower
priority bit (BulkNormalPri). Experimentally we observed that during
initial scans the encoding routines in changefeedccl are the most
impactful CPU-wise, something #89589 can help with. We leave admission
integration of parallel worker goroutines to future work.

Unlike export requests, rangefeed catchup scans are non-premptible. The
rangefeed RPC is a streaming one, and the catchup scan is done during
stream setup. So we don't have resumption tokens to propagate up to the
caller like we did for backups. We still want CPU-bound work going
through admission control to use at most 100ms of CPU time (it makes
for an ineffective controller otherwise), and to that end we introduce
the following component used within the rangefeed catchup iterator:

    // Pacer is used in tight loops (CPU-bound) for non-premptible
    // elastic work. Callers are expected to invoke Pace() every loop
    // iteration and Close() once done. Internally this type integrates
    // with elastic CPU work queue, acquiring tokens for the CPU work
    // being done, and blocking if tokens are unavailable. This allows
    // for a form of cooperative scheduling with elastic CPU granters.
    type Pacer struct
    func (p *Pacer) Pace(ctx context.Context) error { ... }
    func (p *Pacer) Close() { ... }

Release note: None

Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
craig[bot] and irfansharif committed Nov 4, 2022
2 parents 91f7388 + a042b60 commit 2238d70
Show file tree
Hide file tree
Showing 44 changed files with 765 additions and 476 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@
<tr><td><code>kv.snapshot_delegation.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.store.admission.provisioned_bandwidth</code></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.reject_over_max_intents_budget.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td></tr>
<tr><td><code>kvadmission.store.provisioned_bandwidth</code></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>schedules.backup.gc_protection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td></tr>
<tr><td><code>security.ocsp.mode</code></td><td>enumeration</td><td><code>off</code></td><td>use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]</td></tr>
<tr><td><code>security.ocsp.timeout</code></td><td>duration</td><td><code>3s</code></td><td>timeout before considering the OCSP server unreachable</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvadmission:kvadmission",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb",
Expand Down Expand Up @@ -2483,6 +2484,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/gc:get_x_data",
"//pkg/kv/kvserver/idalloc:get_x_data",
"//pkg/kv/kvserver/intentresolver:get_x_data",
"//pkg/kv/kvserver/kvadmission:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/liveness:get_x_data",
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
1<<20,
1<<20, // 1 MiB
)

// ScanRequestLimit is the number of Scan requests that can run at once.
Expand All @@ -122,11 +122,14 @@ var ScanRequestLimit = settings.RegisterIntSetting(
)

// ScanRequestSize is the target size of the scan request response.
//
// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such
// as this; reduce the default. Evaluate this as part of #90089.
var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
"the maximum number of bytes returned by each scan request",
16<<20,
16<<20, // 16 MiB
)

// SinkThrottleConfig describes throttling configuration for the sink.
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,14 @@ func (p *scanRequestScanner) exportSpan(
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
// TODO(irfansharif): Make this configurable if we want system table
// scanners or support "high priority" changefeeds to run at higher
// priorities. We use higher AC priorities for system-internal
// rangefeeds listening in on system table changes.
Priority: int32(admissionpb.BulkNormalPri),
// We specify a creation time for each batch (as opposed to at the
// txn level) -- this way later batches from earlier txns don't just
// out compete batches from newer txns.
CreateTime: start.UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
1 change: 0 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ const (
// V22_2SupportAssumeRoleAuth is the version where assume role authorization is
// supported in cloud storage and KMS.
V22_2SupportAssumeRoleAuth

// V22_2FixUserfileRelatedDescriptorCorruption adds a migration which uses
// heuristics to identify invalid table descriptors for userfile-related
// descriptors.
Expand Down
7 changes: 2 additions & 5 deletions pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,15 +934,12 @@ var grafanaURLCmd = &cobra.Command{
Short: `returns a url to the grafana dashboard`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) error {
urls, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0],
url, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0],
grafanaurlOpen)
if err != nil {
return err
}
for _, url := range urls {
fmt.Println(url)
}
fmt.Println("username: admin; pwd: admin")
fmt.Println(url)
return nil
}),
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,11 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
//
// TODO(irfansharif): Audit uses of this since API since it bypasses AC. Make
// the other variant (TxnWithAdmissionControl) the default, or maybe rename this
// to be more explicit (TxnWithoutAdmissionControl) so new callers have to be
// conscious about what they want.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
return db.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_OTHER, admissionpb.NormalPri, retryable)
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {

type rangeFeedConfig struct {
useMuxRangeFeed bool
overSystemTable bool
}

// RangeFeedOption configures a RangeFeed.
Expand All @@ -104,6 +105,14 @@ func WithMuxRangeFeed() RangeFeedOption {
})
}

// WithSystemTablePriority is used for system-internal rangefeeds, it uses a
// higher admission priority during catch up scans.
func WithSystemTablePriority() RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.overSystemTable = true
})
}

// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)

Expand Down Expand Up @@ -196,7 +205,7 @@ func (ds *DistSender) RangeFeedSpans(
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter,
sri.token, withDiff, &catchupSem, rangeCh, eventCh)
sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg)
})
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -372,6 +381,7 @@ func (ds *DistSender) partialRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
rangeCh chan<- singleRangeInfo,
eventCh chan<- RangeFeedMessage,
cfg rangeFeedConfig,
) error {
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()
Expand Down Expand Up @@ -408,7 +418,7 @@ func (ds *DistSender) partialRangeFeed(
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(
ctx, span, startAfter, withDiff, token.Desc(),
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent)
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg)

// Forward the timestamp in case we end up sending it again.
startAfter.Forward(maxTS)
Expand Down Expand Up @@ -496,11 +506,16 @@ func (ds *DistSender) singleRangeFeed(
eventCh chan<- RangeFeedMessage,
streamProducerFactory rangeFeedEventProducerFactory,
onRangeEvent onRangeEventCb,
cfg rangeFeedConfig,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()

admissionPri := admissionpb.BulkNormalPri
if cfg.overSystemTable {
admissionPri = admissionpb.NormalPri
}
args := roachpb.RangeFeedRequest{
Span: span,
Header: roachpb.Header{
Expand All @@ -511,7 +526,7 @@ func (ds *DistSender) singleRangeFeed(
AdmissionHeader: roachpb.AdmissionHeader{
// NB: AdmissionHeader is used only at the start of the range feed
// stream since the initial catch-up scan is expensive.
Priority: int32(admissionpb.BulkNormalPri),
Priority: int32(admissionPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/limit",
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type scanConfig struct {

// configures retry behavior
retryBehavior ScanRetryBehavior

// overSystemTable indicates whether this rangefeed is over a system table
// (used internally for CRDB's own functioning) and therefore should be
// treated with a more appropriate admission pri (NormalPri instead of
// BulkNormalPri).
overSystemTable bool
}

type optionFunc func(*config)
Expand Down Expand Up @@ -287,3 +293,12 @@ func WithPProfLabel(key, value string) Option {
c.extraPProfLabels = append(c.extraPProfLabels, key, value)
})
}

// WithSystemTablePriority communicates that the rangefeed is over a system
// table and thus operates at a higher priority (this primarily affects
// admission control).
func WithSystemTablePriority() Option {
return optionFunc(func(c *config) {
c.overSystemTable = true
})
}
73 changes: 42 additions & 31 deletions pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
Expand Down Expand Up @@ -74,8 +75,9 @@ func (dbc *dbAdapter) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC)
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...)
}

// concurrentBoundAccount is a thread safe bound account.
Expand Down Expand Up @@ -118,7 +120,7 @@ func (dbc *dbAdapter) Scan(
// If we don't have parallelism configured, just scan each span in turn.
if cfg.scanParallelism == nil {
for _, sp := range spans {
if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, acc); err != nil {
if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc); err != nil {
return err
}
}
Expand Down Expand Up @@ -154,7 +156,7 @@ func (dbc *dbAdapter) Scan(
g := ctxgroup.WithContext(ctx)
err := dbc.divideAndSendScanRequests(
ctx, &g, spans, asOf, rowFn,
parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, acc)
parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc)
if err != nil {
cancel()
}
Expand All @@ -168,6 +170,7 @@ func (dbc *dbAdapter) scanSpan(
rowFn func(value roachpb.KeyValue),
targetScanBytes int64,
onScanDone OnScanCompleted,
overSystemTable bool,
acc *concurrentBoundAccount,
) error {
if acc != nil {
Expand All @@ -177,39 +180,46 @@ func (dbc *dbAdapter) scanSpan(
defer acc.Shrink(ctx, targetScanBytes)
}

return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
admissionPri := admissionpb.BulkNormalPri
if overSystemTable {
admissionPri = admissionpb.NormalPri
}
return dbc.db.TxnWithAdmissionControl(ctx,
roachpb.AdmissionHeader_ROOT_KV,
admissionPri,
func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
if onScanDone != nil {
return onScanDone(ctx, sp)
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
if onScanDone != nil {
return onScanDone(ctx, sp)
}
return nil
}
return nil
}

if onScanDone != nil {
if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
return err
if onScanDone != nil {
if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
return err
}
}
}

sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
}

// divideAndSendScanRequests divides spans into small ranges based on range boundaries,
Expand All @@ -224,6 +234,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
parallelismFn func() int,
targetScanBytes int64,
onSpanDone OnScanCompleted,
overSystemTable bool,
acc *concurrentBoundAccount,
) error {
// Build a span group so that we can iterate spans in order.
Expand Down Expand Up @@ -261,7 +272,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
sp := partialRS.AsRawSpanWithNoLocals()
workGroup.GoCtx(func(ctx context.Context) error {
defer limAlloc.Release()
return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, acc)
return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, overSystemTable, acc)
})

if !ri.NeedAnother(nextRS) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvclient/rangefeed/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2238d70

Please sign in to comment.