diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 2e2e52c98de2..7929ce3ef075 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -63,10 +63,10 @@
kv.snapshot_delegation.enabled | boolean | false | set to true to allow snapshots from follower replicas |
kv.snapshot_rebalance.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for rebalance and upreplication snapshots |
kv.snapshot_recovery.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for recovery snapshots |
-kv.store.admission.provisioned_bandwidth | byte size | 0 B | if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag |
kv.transaction.max_intents_bytes | integer | 4194304 | maximum number of bytes used to track locks in transactions |
kv.transaction.max_refresh_spans_bytes | integer | 4194304 | maximum number of bytes used to track refresh spans in serializable transactions |
kv.transaction.reject_over_max_intents_budget.enabled | boolean | false | if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed |
+kvadmission.store.provisioned_bandwidth | byte size | 0 B | if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag |
schedules.backup.gc_protection.enabled | boolean | true | enable chaining of GC protection across backups run as part of a schedule |
security.ocsp.mode | enumeration | off | use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] |
security.ocsp.timeout | duration | 3s | timeout before considering the OCSP server unreachable |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index 34228957eee8..55873102efb7 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -1165,6 +1165,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
+ "//pkg/kv/kvserver/kvadmission:kvadmission",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb",
@@ -2483,6 +2484,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/gc:get_x_data",
"//pkg/kv/kvserver/idalloc:get_x_data",
"//pkg/kv/kvserver/intentresolver:get_x_data",
+ "//pkg/kv/kvserver/kvadmission:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/liveness:get_x_data",
diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go
index 41b4b0f141cd..22841234c7e1 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/settings.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go
@@ -108,7 +108,7 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
- 1<<20,
+ 1<<20, // 1 MiB
)
// ScanRequestLimit is the number of Scan requests that can run at once.
@@ -122,11 +122,14 @@ var ScanRequestLimit = settings.RegisterIntSetting(
)
// ScanRequestSize is the target size of the scan request response.
+//
+// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such
+// as this; reduce the default. Evaluate this as part of #90089.
var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
"the maximum number of bytes returned by each scan request",
- 16<<20,
+ 16<<20, // 16 MiB
)
// SinkThrottleConfig describes throttling configuration for the sink.
diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go
index d53f92336dcb..ff248c40943e 100644
--- a/pkg/ccl/changefeedccl/kvfeed/scanner.go
+++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go
@@ -142,7 +142,14 @@ func (p *scanRequestScanner) exportSpan(
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = roachpb.AdmissionHeader{
- Priority: int32(admissionpb.BulkNormalPri),
+ // TODO(irfansharif): Make this configurable if we want system table
+ // scanners or support "high priority" changefeeds to run at higher
+ // priorities. We use higher AC priorities for system-internal
+ // rangefeeds listening in on system table changes.
+ Priority: int32(admissionpb.BulkNormalPri),
+ // We specify a creation time for each batch (as opposed to at the
+ // txn level) -- this way later batches from earlier txns don't just
+ // out compete batches from newer txns.
CreateTime: start.UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go
index fe39c4017bc4..074a3780a67c 100644
--- a/pkg/clusterversion/cockroach_versions.go
+++ b/pkg/clusterversion/cockroach_versions.go
@@ -302,7 +302,6 @@ const (
// V22_2SupportAssumeRoleAuth is the version where assume role authorization is
// supported in cloud storage and KMS.
V22_2SupportAssumeRoleAuth
-
// V22_2FixUserfileRelatedDescriptorCorruption adds a migration which uses
// heuristics to identify invalid table descriptors for userfile-related
// descriptors.
diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go
index fca57b8a2ded..30f5fd08117a 100644
--- a/pkg/cmd/roachprod/main.go
+++ b/pkg/cmd/roachprod/main.go
@@ -934,15 +934,12 @@ var grafanaURLCmd = &cobra.Command{
Short: `returns a url to the grafana dashboard`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) error {
- urls, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0],
+ url, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0],
grafanaurlOpen)
if err != nil {
return err
}
- for _, url := range urls {
- fmt.Println(url)
- }
- fmt.Println("username: admin; pwd: admin")
+ fmt.Println(url)
return nil
}),
}
diff --git a/pkg/kv/db.go b/pkg/kv/db.go
index 055f762d53c7..43eddb05f5ca 100644
--- a/pkg/kv/db.go
+++ b/pkg/kv/db.go
@@ -889,6 +889,11 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
+//
+// TODO(irfansharif): Audit uses of this since API since it bypasses AC. Make
+// the other variant (TxnWithAdmissionControl) the default, or maybe rename this
+// to be more explicit (TxnWithoutAdmissionControl) so new callers have to be
+// conscious about what they want.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
return db.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_OTHER, admissionpb.NormalPri, retryable)
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
index a5123f42c7a4..7bd8888e9c2a 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
@@ -86,6 +86,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {
type rangeFeedConfig struct {
useMuxRangeFeed bool
+ overSystemTable bool
}
// RangeFeedOption configures a RangeFeed.
@@ -104,6 +105,14 @@ func WithMuxRangeFeed() RangeFeedOption {
})
}
+// WithSystemTablePriority is used for system-internal rangefeeds, it uses a
+// higher admission priority during catch up scans.
+func WithSystemTablePriority() RangeFeedOption {
+ return optionFunc(func(c *rangeFeedConfig) {
+ c.overSystemTable = true
+ })
+}
+
// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)
@@ -196,7 +205,7 @@ func (ds *DistSender) RangeFeedSpans(
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter,
- sri.token, withDiff, &catchupSem, rangeCh, eventCh)
+ sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg)
})
case <-ctx.Done():
return ctx.Err()
@@ -372,6 +381,7 @@ func (ds *DistSender) partialRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
rangeCh chan<- singleRangeInfo,
eventCh chan<- RangeFeedMessage,
+ cfg rangeFeedConfig,
) error {
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()
@@ -408,7 +418,7 @@ func (ds *DistSender) partialRangeFeed(
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(
ctx, span, startAfter, withDiff, token.Desc(),
- catchupSem, eventCh, streamProducerFactory, active.onRangeEvent)
+ catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg)
// Forward the timestamp in case we end up sending it again.
startAfter.Forward(maxTS)
@@ -496,11 +506,16 @@ func (ds *DistSender) singleRangeFeed(
eventCh chan<- RangeFeedMessage,
streamProducerFactory rangeFeedEventProducerFactory,
onRangeEvent onRangeEventCb,
+ cfg rangeFeedConfig,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()
+ admissionPri := admissionpb.BulkNormalPri
+ if cfg.overSystemTable {
+ admissionPri = admissionpb.NormalPri
+ }
args := roachpb.RangeFeedRequest{
Span: span,
Header: roachpb.Header{
@@ -511,7 +526,7 @@ func (ds *DistSender) singleRangeFeed(
AdmissionHeader: roachpb.AdmissionHeader{
// NB: AdmissionHeader is used only at the start of the range feed
// stream since the initial catch-up scan is expensive.
- Priority: int32(admissionpb.BulkNormalPri),
+ Priority: int32(admissionPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel
index 459ddc79f8a5..5cceb6d1c005 100644
--- a/pkg/kv/kvclient/rangefeed/BUILD.bazel
+++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel
@@ -21,6 +21,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
+ "//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/limit",
diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go
index c3d5621f421a..2be820c90b49 100644
--- a/pkg/kv/kvclient/rangefeed/config.go
+++ b/pkg/kv/kvclient/rangefeed/config.go
@@ -63,6 +63,12 @@ type scanConfig struct {
// configures retry behavior
retryBehavior ScanRetryBehavior
+
+ // overSystemTable indicates whether this rangefeed is over a system table
+ // (used internally for CRDB's own functioning) and therefore should be
+ // treated with a more appropriate admission pri (NormalPri instead of
+ // BulkNormalPri).
+ overSystemTable bool
}
type optionFunc func(*config)
@@ -287,3 +293,12 @@ func WithPProfLabel(key, value string) Option {
c.extraPProfLabels = append(c.extraPProfLabels, key, value)
})
}
+
+// WithSystemTablePriority communicates that the rangefeed is over a system
+// table and thus operates at a higher priority (this primarily affects
+// admission control).
+func WithSystemTablePriority() Option {
+ return optionFunc(func(c *config) {
+ c.overSystemTable = true
+ })
+}
diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go
index 759091e9909b..7bcb92110fc9 100644
--- a/pkg/kv/kvclient/rangefeed/db_adapter.go
+++ b/pkg/kv/kvclient/rangefeed/db_adapter.go
@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
@@ -74,8 +75,9 @@ func (dbc *dbAdapter) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
+ opts ...kvcoord.RangeFeedOption,
) error {
- return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC)
+ return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...)
}
// concurrentBoundAccount is a thread safe bound account.
@@ -118,7 +120,7 @@ func (dbc *dbAdapter) Scan(
// If we don't have parallelism configured, just scan each span in turn.
if cfg.scanParallelism == nil {
for _, sp := range spans {
- if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, acc); err != nil {
+ if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc); err != nil {
return err
}
}
@@ -154,7 +156,7 @@ func (dbc *dbAdapter) Scan(
g := ctxgroup.WithContext(ctx)
err := dbc.divideAndSendScanRequests(
ctx, &g, spans, asOf, rowFn,
- parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, acc)
+ parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc)
if err != nil {
cancel()
}
@@ -168,6 +170,7 @@ func (dbc *dbAdapter) scanSpan(
rowFn func(value roachpb.KeyValue),
targetScanBytes int64,
onScanDone OnScanCompleted,
+ overSystemTable bool,
acc *concurrentBoundAccount,
) error {
if acc != nil {
@@ -177,39 +180,46 @@ func (dbc *dbAdapter) scanSpan(
defer acc.Shrink(ctx, targetScanBytes)
}
- return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
- if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
- return err
- }
- sp := span
- var b kv.Batch
- for {
- b.Header.TargetBytes = targetScanBytes
- b.Scan(sp.Key, sp.EndKey)
- if err := txn.Run(ctx, &b); err != nil {
+ admissionPri := admissionpb.BulkNormalPri
+ if overSystemTable {
+ admissionPri = admissionpb.NormalPri
+ }
+ return dbc.db.TxnWithAdmissionControl(ctx,
+ roachpb.AdmissionHeader_ROOT_KV,
+ admissionPri,
+ func(ctx context.Context, txn *kv.Txn) error {
+ if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
- res := b.Results[0]
- for _, row := range res.Rows {
- rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
- }
- if res.ResumeSpan == nil {
- if onScanDone != nil {
- return onScanDone(ctx, sp)
+ sp := span
+ var b kv.Batch
+ for {
+ b.Header.TargetBytes = targetScanBytes
+ b.Scan(sp.Key, sp.EndKey)
+ if err := txn.Run(ctx, &b); err != nil {
+ return err
+ }
+ res := b.Results[0]
+ for _, row := range res.Rows {
+ rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
+ }
+ if res.ResumeSpan == nil {
+ if onScanDone != nil {
+ return onScanDone(ctx, sp)
+ }
+ return nil
}
- return nil
- }
- if onScanDone != nil {
- if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
- return err
+ if onScanDone != nil {
+ if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
+ return err
+ }
}
- }
- sp = res.ResumeSpanAsValue()
- b = kv.Batch{}
- }
- })
+ sp = res.ResumeSpanAsValue()
+ b = kv.Batch{}
+ }
+ })
}
// divideAndSendScanRequests divides spans into small ranges based on range boundaries,
@@ -224,6 +234,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
parallelismFn func() int,
targetScanBytes int64,
onSpanDone OnScanCompleted,
+ overSystemTable bool,
acc *concurrentBoundAccount,
) error {
// Build a span group so that we can iterate spans in order.
@@ -261,7 +272,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
sp := partialRS.AsRawSpanWithNoLocals()
workGroup.GoCtx(func(ctx context.Context) error {
defer limAlloc.Release()
- return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, acc)
+ return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, overSystemTable, acc)
})
if !ri.NeedAnother(nextRS) {
diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go
index b0472217ad47..8c211dcb108d 100644
--- a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go
+++ b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go
@@ -38,17 +38,22 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder {
}
// RangeFeed mocks base method.
-func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage) error {
+func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage, arg5 ...kvcoord.RangeFeedOption) error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "RangeFeed", arg0, arg1, arg2, arg3, arg4)
+ varargs := []interface{}{arg0, arg1, arg2, arg3, arg4}
+ for _, a := range arg5 {
+ varargs = append(varargs, a)
+ }
+ ret := m.ctrl.Call(m, "RangeFeed", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// RangeFeed indicates an expected call of RangeFeed.
-func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
+func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3, arg4 interface{}, arg5 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockDB)(nil).RangeFeed), arg0, arg1, arg2, arg3, arg4)
+ varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4}, arg5...)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockDB)(nil).RangeFeed), varargs...)
}
// Scan mocks base method.
diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go
index 389717380d33..aaf2e75ecd76 100644
--- a/pkg/kv/kvclient/rangefeed/rangefeed.go
+++ b/pkg/kv/kvclient/rangefeed/rangefeed.go
@@ -55,6 +55,7 @@ type DB interface {
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
+ opts ...kvcoord.RangeFeedOption,
) error
// Scan encapsulates scanning a key span at a given point in time. The method
@@ -287,6 +288,11 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
// draining when the rangefeed fails.
eventCh := make(chan kvcoord.RangeFeedMessage)
+ var rangefeedOpts []kvcoord.RangeFeedOption
+ if f.scanConfig.overSystemTable {
+ rangefeedOpts = append(rangefeedOpts, kvcoord.WithSystemTablePriority())
+ }
+
for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
if log.ExpensiveLogEnabled(ctx, 1) {
@@ -296,7 +302,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
start := timeutil.Now()
rangeFeedTask := func(ctx context.Context) error {
- return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh)
+ return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...)
}
processEventsTask := func(ctx context.Context) error {
return f.processEvents(ctx, frontier, eventCh)
diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
index 8565b10b4baa..67bfd19e51a8 100644
--- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
+++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
@@ -54,6 +54,7 @@ func (m *mockClient) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
+ opts ...kvcoord.RangeFeedOption,
) error {
return m.rangefeed(ctx, spans, startFrom, withDiff, eventC)
}
@@ -364,7 +365,7 @@ func TestBackoffOnRangefeedFailure(t *testing.T) {
Times(3).
Return(errors.New("rangefeed failed"))
db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
- Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage) {
+ Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) {
cancel()
}).
Return(nil)
diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
index 33e5d52f3acd..32b7eaed15da 100644
--- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
+++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
@@ -292,6 +292,10 @@ func (s *Watcher) Run(ctx context.Context) error {
case frontierBumpedCh <- struct{}{}:
}
}),
+ // TODO(irfansharif): Consider making this configurable on the Watcher
+ // type. As of 2022-11 all uses of this type are system-internal ones
+ // where a higher admission-pri makes sense.
+ rangefeed.WithSystemTablePriority(),
rangefeed.WithDiff(s.withPrevValue),
rangefeed.WithRowTimestampInInitialScan(true),
rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) {
diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel
index c971e3c40cd8..9f129b8189ce 100644
--- a/pkg/kv/kvserver/BUILD.bazel
+++ b/pkg/kv/kvserver/BUILD.bazel
@@ -131,6 +131,7 @@ go_library(
"//pkg/kv/kvserver/gc",
"//pkg/kv/kvserver/idalloc",
"//pkg/kv/kvserver/intentresolver",
+ "//pkg/kv/kvserver/kvadmission",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go
index 5f6d93a5e38a..c1cd5b7c8fcf 100644
--- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go
+++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go
@@ -442,7 +442,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
defer cancel()
stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
- err := tc.repls[0].RangeFeed(args, stream1).GoError()
+ err := tc.repls[0].RangeFeed(args, stream1, nil /* pacer */).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
@@ -496,7 +496,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
// the breaker.
stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
- err := tc.repls[0].RangeFeed(args, stream2).GoError()
+ err := tc.repls[0].RangeFeed(args, stream2, nil /* pacer */).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel
new file mode 100644
index 000000000000..44573b5e43fc
--- /dev/null
+++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel
@@ -0,0 +1,24 @@
+load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "kvadmission",
+ srcs = ["kvadmission.go"],
+ importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/roachpb",
+ "//pkg/settings",
+ "//pkg/settings/cluster",
+ "//pkg/util/admission",
+ "//pkg/util/admission/admissionpb",
+ "//pkg/util/buildutil",
+ "//pkg/util/log",
+ "//pkg/util/stop",
+ "//pkg/util/timeutil",
+ "@com_github_cockroachdb_errors//:errors",
+ "@com_github_cockroachdb_pebble//:pebble",
+ ],
+)
+
+get_x_data(name = "get_x_data")
diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go
new file mode 100644
index 000000000000..b682b584648e
--- /dev/null
+++ b/pkg/kv/kvserver/kvadmission/kvadmission.go
@@ -0,0 +1,487 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+// Package kvadmission is the integration layer between KV and admission
+// control.
+package kvadmission
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble"
+)
+
+// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted
+// for each export request.
+var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
+ settings.SystemOnly,
+ "kvadmission.elastic_cpu.duration_per_export_request",
+ "controls how many CPU tokens are allotted for each export request",
+ admission.MaxElasticCPUDuration,
+ func(duration time.Duration) error {
+ if duration < admission.MinElasticCPUDuration {
+ return fmt.Errorf("minimum CPU duration allowed per export request is %s, got %s",
+ admission.MinElasticCPUDuration, duration)
+ }
+ if duration > admission.MaxElasticCPUDuration {
+ return fmt.Errorf("maximum CPU duration allowed per export request is %s, got %s",
+ admission.MaxElasticCPUDuration, duration)
+ }
+ return nil
+ },
+)
+
+// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
+// allotted for each unit of work during rangefeed catchup scans. Only takes
+// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set.
+var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting(
+ settings.SystemOnly,
+ "kvadmission.elastic_cpu.duration_per_rangefeed_scan_unit",
+ "controls how many CPU tokens are allotted for each unit of work during rangefeed catchup scans",
+ admission.MaxElasticCPUDuration,
+ func(duration time.Duration) error {
+ if duration < admission.MinElasticCPUDuration {
+ return fmt.Errorf("minimum CPU duration allowed is %s, got %s",
+ admission.MinElasticCPUDuration, duration)
+ }
+ if duration > admission.MaxElasticCPUDuration {
+ return fmt.Errorf("maximum CPU duration allowed is %s, got %s",
+ admission.MaxElasticCPUDuration, duration)
+ }
+ return nil
+ },
+)
+
+// rangefeedCatchupScanElasticControlEnabled determines whether rangefeed catch
+// up scans integrate with elastic CPU control.
+var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting(
+ settings.SystemOnly,
+ "kvadmission.rangefeed_catchup_scan_elastic_control.enabled",
+ "determines whether rangefeed catchup scans integrate with the elastic CPU control",
+ true,
+)
+
+// Controller provides admission control for the KV layer.
+type Controller interface {
+ // AdmitKVWork must be called before performing KV work.
+ // BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
+ // populated for admission to work correctly. If err is non-nil, the
+ // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
+ // called after the KV work is done executing.
+ AdmitKVWork(context.Context, roachpb.TenantID, *roachpb.BatchRequest) (Handle, error)
+ // AdmittedKVWorkDone is called after the admitted KV work is done
+ // executing.
+ AdmittedKVWorkDone(Handle, *StoreWriteBytes)
+ // AdmitRangefeedRequest must be called before serving rangefeed requests.
+ // If enabled, it returns a non-nil Pacer that's to be used within rangefeed
+ // catchup scans (typically CPU-intensive and affecting scheduling
+ // latencies).
+ AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer
+ // SetTenantWeightProvider is used to set the provider that will be
+ // periodically polled for weights. The stopper should be used to terminate
+ // the periodic polling.
+ SetTenantWeightProvider(TenantWeightProvider, *stop.Stopper)
+ // SnapshotIngested informs admission control about a range snapshot
+ // ingestion.
+ SnapshotIngested(roachpb.StoreID, pebble.IngestOperationStats)
+ // FollowerStoreWriteBytes informs admission control about writes
+ // replicated to a raft follower, that have not been subject to admission
+ // control.
+ FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes)
+}
+
+// TenantWeightProvider can be periodically asked to provide the tenant
+// weights.
+type TenantWeightProvider interface {
+ GetTenantWeights() TenantWeights
+}
+
+// TenantWeights contains the various tenant weights.
+type TenantWeights struct {
+ // Node is the node level tenant ID => weight.
+ Node map[uint64]uint32
+ // Stores contains the per-store tenant weights.
+ Stores []TenantWeightsForStore
+}
+
+// TenantWeightsForStore contains the tenant weights for a store.
+type TenantWeightsForStore struct {
+ roachpb.StoreID
+ // Weights is tenant ID => weight.
+ Weights map[uint64]uint32
+}
+
+// controllerImpl implements Controller interface.
+type controllerImpl struct {
+ // Admission control queues and coordinators. All three should be nil or
+ // non-nil.
+ kvAdmissionQ *admission.WorkQueue
+ storeGrantCoords *admission.StoreGrantCoordinators
+ elasticCPUWorkQueue *admission.ElasticCPUWorkQueue
+ settings *cluster.Settings
+ every log.EveryN
+}
+
+var _ Controller = &controllerImpl{}
+
+// Handle groups data around some piece admitted work. Depending on the
+// type of work, it holds (a) references to specific work queues, (b) state
+// needed to inform said work queues of what work was done after the fact, and
+// (c) information around how much work a request is allowed to do (used for
+// cooperative scheduling with elastic CPU granters).
+type Handle struct {
+ tenantID roachpb.TenantID
+ storeAdmissionQ *admission.StoreWorkQueue
+ storeWorkHandle admission.StoreWorkHandle
+ ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle
+
+ callAdmittedWorkDoneOnKVAdmissionQ bool
+}
+
+// MakeController returns a Controller. All three parameters must together be
+// nil or non-nil.
+func MakeController(
+ kvAdmissionQ *admission.WorkQueue,
+ elasticCPUWorkQueue *admission.ElasticCPUWorkQueue,
+ storeGrantCoords *admission.StoreGrantCoordinators,
+ settings *cluster.Settings,
+) Controller {
+ return &controllerImpl{
+ kvAdmissionQ: kvAdmissionQ,
+ storeGrantCoords: storeGrantCoords,
+ elasticCPUWorkQueue: elasticCPUWorkQueue,
+ settings: settings,
+ every: log.Every(10 * time.Second),
+ }
+}
+
+// AdmitKVWork implements the Controller interface.
+//
+// TODO(irfansharif): There's a fair bit happening here and there's no test
+// coverage. Fix that.
+func (n *controllerImpl) AdmitKVWork(
+ ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
+) (handle Handle, retErr error) {
+ ah := Handle{tenantID: tenantID}
+ if n.kvAdmissionQ == nil {
+ return ah, nil
+ }
+
+ bypassAdmission := ba.IsAdmin()
+ source := ba.AdmissionHeader.Source
+ if !roachpb.IsSystemTenantID(tenantID.ToUint64()) {
+ // Request is from a SQL node.
+ bypassAdmission = false
+ source = roachpb.AdmissionHeader_FROM_SQL
+ }
+ if source == roachpb.AdmissionHeader_OTHER {
+ bypassAdmission = true
+ }
+ // TODO(abaptist): Revisit and deprecate this setting in v23.1.
+ if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) {
+ if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
+ bypassAdmission = true
+ }
+ }
+ createTime := ba.AdmissionHeader.CreateTime
+ if !bypassAdmission && createTime == 0 {
+ // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
+ // of zero CreateTime needs to be revisited. It should use high priority.
+ createTime = timeutil.Now().UnixNano()
+ }
+ admissionInfo := admission.WorkInfo{
+ TenantID: tenantID,
+ Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority),
+ CreateTime: createTime,
+ BypassAdmission: bypassAdmission,
+ }
+
+ admissionEnabled := true
+ // Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though
+ // it would bypass admission, it would consume a slot. When writes are
+ // throttled, we start generating more txn heartbeats, which then consume
+ // all the slots, causing no useful work to happen. We do want useful work
+ // to continue even when throttling since there are often significant
+ // number of tokens available.
+ if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
+ storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
+ if storeAdmissionQ != nil {
+ storeWorkHandle, err := storeAdmissionQ.Admit(
+ ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
+ if err != nil {
+ return Handle{}, err
+ }
+ admissionEnabled = storeWorkHandle.AdmissionEnabled()
+ if admissionEnabled {
+ defer func() {
+ if retErr != nil {
+ // No bytes were written.
+ _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{})
+ }
+ }()
+ ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle
+ }
+ }
+ }
+ if admissionEnabled {
+ if ba.IsSingleExportRequest() {
+ // Backups generate batches with single export requests, which we
+ // admit through the elastic CPU work queue. We grant this
+ // CPU-intensive work a set amount of CPU time and expect it to
+ // terminate (cooperatively) once it exceeds its grant. The amount
+ // disbursed is 100ms, which we've experimentally found to be long
+ // enough to do enough useful work per-request while not causing too
+ // much in the way of scheduling delays on individual cores. Within
+ // admission control we have machinery that observes scheduling
+ // latencies periodically and reduces the total amount of CPU time
+ // handed out through this mechanism, as a way to provide latency
+ // isolation to non-elastic ("latency sensitive") work running on
+ // the same machine.
+ elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit(
+ ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
+ )
+ if err != nil {
+ return Handle{}, err
+ }
+ ah.ElasticCPUWorkHandle = elasticWorkHandle
+ defer func() {
+ if retErr != nil {
+ // No elastic work was done.
+ n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
+ }
+ }()
+ } else {
+ callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
+ if err != nil {
+ return Handle{}, err
+ }
+ ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
+ }
+ }
+ return ah, nil
+}
+
+// AdmittedKVWorkDone implements the Controller interface.
+func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
+ n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
+ if ah.callAdmittedWorkDoneOnKVAdmissionQ {
+ n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
+ }
+ if ah.storeAdmissionQ != nil {
+ var doneInfo admission.StoreWorkDoneInfo
+ if writeBytes != nil {
+ doneInfo = admission.StoreWorkDoneInfo(*writeBytes)
+ }
+ err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo)
+ if err != nil {
+ // This shouldn't be happening.
+ if buildutil.CrdbTestBuild {
+ log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err))
+ }
+ if n.every.ShouldLog() {
+ log.Errorf(context.Background(), "%s", err)
+ }
+ }
+ }
+}
+
+// AdmitRangefeedRequest implements the Controller interface.
+func (n *controllerImpl) AdmitRangefeedRequest(
+ tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest,
+) *Pacer {
+ if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) {
+ return nil
+ }
+
+ return &Pacer{
+ unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV),
+ wi: admission.WorkInfo{
+ TenantID: tenantID,
+ Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority),
+ CreateTime: request.AdmissionHeader.CreateTime,
+ BypassAdmission: false,
+ },
+ wq: n.elasticCPUWorkQueue,
+ }
+}
+
+// SetTenantWeightProvider implements the Controller interface.
+func (n *controllerImpl) SetTenantWeightProvider(
+ provider TenantWeightProvider, stopper *stop.Stopper,
+) {
+ // TODO(irfansharif): Use a stopper here instead.
+ go func() {
+ const weightCalculationPeriod = 10 * time.Minute
+ ticker := time.NewTicker(weightCalculationPeriod)
+ // Used for short-circuiting the weights calculation if all weights are
+ // disabled.
+ allWeightsDisabled := false
+ for {
+ select {
+ case <-ticker.C:
+ kvDisabled := !admission.KVTenantWeightsEnabled.Get(&n.settings.SV)
+ kvStoresDisabled := !admission.KVStoresTenantWeightsEnabled.Get(&n.settings.SV)
+ if allWeightsDisabled && kvDisabled && kvStoresDisabled {
+ // Have already transitioned to disabled, so noop.
+ continue
+ }
+ weights := provider.GetTenantWeights()
+ if kvDisabled {
+ weights.Node = nil
+ }
+ n.kvAdmissionQ.SetTenantWeights(weights.Node)
+ n.elasticCPUWorkQueue.SetTenantWeights(weights.Node)
+
+ for _, storeWeights := range weights.Stores {
+ q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID))
+ if q != nil {
+ if kvStoresDisabled {
+ storeWeights.Weights = nil
+ }
+ q.SetTenantWeights(storeWeights.Weights)
+ }
+ }
+ allWeightsDisabled = kvDisabled && kvStoresDisabled
+ case <-stopper.ShouldQuiesce():
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+}
+
+// SnapshotIngested implements the Controller interface.
+func (n *controllerImpl) SnapshotIngested(
+ storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats,
+) {
+ storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID))
+ if storeAdmissionQ == nil {
+ return
+ }
+ storeAdmissionQ.StatsToIgnore(ingestStats)
+}
+
+// FollowerStoreWriteBytes implements the Controller interface.
+func (n *controllerImpl) FollowerStoreWriteBytes(
+ storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes,
+) {
+ if followerWriteBytes.WriteBytes == 0 && followerWriteBytes.IngestedBytes == 0 {
+ return
+ }
+ storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID))
+ if storeAdmissionQ == nil {
+ return
+ }
+ storeAdmissionQ.BypassedWorkDone(
+ followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo)
+}
+
+// ProvisionedBandwidth set a value of the provisioned
+// bandwidth for each store in the cluster.
+var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
+ settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
+ "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
+ "for each store. It can be over-ridden on a per-store basis using the --store flag",
+ 0).WithPublic()
+
+// FollowerStoreWriteBytes captures stats about writes done to a store by a
+// replica that is not the leaseholder. These are used for admission control.
+type FollowerStoreWriteBytes struct {
+ NumEntries int64
+ admission.StoreWorkDoneInfo
+}
+
+// Merge follower store write statistics using the given data.
+func (f *FollowerStoreWriteBytes) Merge(from FollowerStoreWriteBytes) {
+ f.NumEntries += from.NumEntries
+ f.WriteBytes += from.WriteBytes
+ f.IngestedBytes += from.IngestedBytes
+}
+
+// StoreWriteBytes aliases admission.StoreWorkDoneInfo, since the notion of
+// "work is done" is specific to admission control and doesn't need to leak
+// everywhere.
+type StoreWriteBytes admission.StoreWorkDoneInfo
+
+var storeWriteBytesPool = sync.Pool{
+ New: func() interface{} { return &StoreWriteBytes{} },
+}
+
+// NewStoreWriteBytes constructs a new StoreWriteBytes.
+func NewStoreWriteBytes() *StoreWriteBytes {
+ wb := storeWriteBytesPool.Get().(*StoreWriteBytes)
+ *wb = StoreWriteBytes{}
+ return wb
+}
+
+// Release returns the *StoreWriteBytes to the pool.
+func (wb *StoreWriteBytes) Release() {
+ if wb == nil {
+ return
+ }
+ storeWriteBytesPool.Put(wb)
+}
+
+// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work.
+// Callers are expected to invoke Pace() every loop iteration and Close() once
+// done. Internally this type integrates with elastic CPU work queue, acquiring
+// tokens for the CPU work being done, and blocking if tokens are unavailable.
+// This allows for a form of cooperative scheduling with elastic CPU granters.
+type Pacer struct {
+ unit time.Duration
+ wi admission.WorkInfo
+ wq *admission.ElasticCPUWorkQueue
+
+ cur *admission.ElasticCPUWorkHandle
+}
+
+// Pace is part of the Pacer interface.
+func (p *Pacer) Pace(ctx context.Context) error {
+ if p == nil {
+ return nil
+ }
+
+ if overLimit, _ := p.cur.OverLimit(); overLimit {
+ p.wq.AdmittedWorkDone(p.cur)
+ p.cur = nil
+ }
+
+ if p.cur == nil {
+ handle, err := p.wq.Admit(ctx, p.unit, p.wi)
+ if err != nil {
+ return err
+ }
+ p.cur = handle
+ }
+ return nil
+}
+
+// Close is part of the Pacer interface.
+func (p *Pacer) Close() {
+ if p == nil || p.cur == nil {
+ return
+ }
+
+ p.wq.AdmittedWorkDone(p.cur)
+ p.cur = nil
+}
diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go
index 7bbe1e5c867a..f0b9fd38bd41 100644
--- a/pkg/kv/kvserver/mvcc_gc_queue.go
+++ b/pkg/kv/kvserver/mvcc_gc_queue.go
@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -506,7 +507,7 @@ func suspectedFullRangeDeletion(ms enginepb.MVCCStats) bool {
type replicaGCer struct {
repl *Replica
count int32 // update atomically
- admissionController KVAdmissionController
+ admissionController kvadmission.Controller
storeID roachpb.StoreID
}
@@ -532,7 +533,7 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error {
ba.Add(&req)
// Since we are talking directly to the replica, we need to explicitly do
// admission control here, as we are bypassing server.Node.
- var admissionHandle AdmissionHandle
+ var admissionHandle kvadmission.Handle
if r.admissionController != nil {
pri := admissionpb.WorkPriority(gc.AdmissionPriority.Get(&r.repl.ClusterSettings().SV))
ba.AdmissionHeader = roachpb.AdmissionHeader{
diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel
index 69c07b22f184..674602f4682f 100644
--- a/pkg/kv/kvserver/rangefeed/BUILD.bazel
+++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel
@@ -17,6 +17,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
+ "//pkg/kv/kvserver/kvadmission",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go
index 242a03537172..06a2fd974cec 100644
--- a/pkg/kv/kvserver/rangefeed/catchup_scan.go
+++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go
@@ -12,12 +12,16 @@ package rangefeed
import (
"bytes"
+ "context"
+ "time"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)
@@ -62,6 +66,7 @@ type CatchUpIterator struct {
close func()
span roachpb.Span
startTime hlc.Timestamp // exclusive
+ pacer *kvadmission.Pacer
}
// NewCatchUpIterator returns a CatchUpIterator for the given Reader over the
@@ -70,7 +75,11 @@ type CatchUpIterator struct {
// NB: startTime is exclusive, i.e. the first possible event will be emitted at
// Timestamp.Next().
func NewCatchUpIterator(
- reader storage.Reader, span roachpb.Span, startTime hlc.Timestamp, closer func(),
+ reader storage.Reader,
+ span roachpb.Span,
+ startTime hlc.Timestamp,
+ closer func(),
+ pacer *kvadmission.Pacer,
) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
@@ -89,6 +98,7 @@ func NewCatchUpIterator(
close: closer,
span: span,
startTime: startTime,
+ pacer: pacer,
}
}
@@ -96,6 +106,7 @@ func NewCatchUpIterator(
// callback.
func (i *CatchUpIterator) Close() {
i.simpleCatchupIter.Close()
+ i.pacer.Close()
if i.close != nil {
i.close()
}
@@ -117,7 +128,9 @@ type outputEventFn func(e *roachpb.RangeFeedEvent) error
// For example, with MVCC range tombstones [a-f)@5 and [a-f)@3 overlapping point
// keys a@6, a@4, and b@2, the emitted order is [a-f)@3,[a-f)@5,a@4,a@6,b@2 because
// the start key "a" is ordered before all of the timestamped point keys.
-func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error {
+func (i *CatchUpIterator) CatchUpScan(
+ ctx context.Context, outputFn outputEventFn, withDiff bool,
+) error {
var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
@@ -143,6 +156,8 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
var lastKey roachpb.Key
var meta enginepb.MVCCMetadata
i.SeekGE(storage.MVCCKey{Key: i.span.Key})
+
+ every := log.Every(100 * time.Millisecond)
for {
if ok, err := i.Valid(); err != nil {
return err
@@ -150,6 +165,14 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err
break
}
+ if err := i.pacer.Pace(ctx); err != nil {
+ // We're unable to pace things automatically -- shout loudly
+ // semi-infrequently but don't fail the rangefeed itself.
+ if every.ShouldLog() {
+ log.Errorf(ctx, "automatic pacing: %v", err)
+ }
+ }
+
// Emit any new MVCC range tombstones when their start key is encountered.
// Range keys can currently only be MVCC range tombstones.
//
diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
index 103db0386136..30684c509c09 100644
--- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
+++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
@@ -45,13 +45,14 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numE
EndKey: endKey,
}
+ ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
func() {
- iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil)
+ iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil, nil)
defer iter.Close()
counter := 0
- err := iter.CatchUpScan(func(*roachpb.RangeFeedEvent) error {
+ err := iter.CatchUpScan(ctx, func(*roachpb.RangeFeedEvent) error {
counter++
return nil
}, opts.withDiff)
diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go
index fde52ff241e4..f4ce29a1473f 100644
--- a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go
+++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go
@@ -111,11 +111,11 @@ func TestCatchupScan(t *testing.T) {
}
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
span := roachpb.Span{Key: testKey1, EndKey: roachpb.KeyMax}
- iter := NewCatchUpIterator(eng, span, ts1, nil)
+ iter := NewCatchUpIterator(eng, span, ts1, nil, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
- require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error {
+ require.NoError(t, iter.CatchUpScan(ctx, func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}, withDiff))
@@ -154,10 +154,10 @@ func TestCatchupScanInlineError(t *testing.T) {
// Run a catchup scan across the span and watch it error.
span := roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey}
- iter := NewCatchUpIterator(eng, span, hlc.Timestamp{}, nil)
+ iter := NewCatchUpIterator(eng, span, hlc.Timestamp{}, nil, nil)
defer iter.Close()
- err := iter.CatchUpScan(nil, false)
+ err := iter.CatchUpScan(ctx, nil, false)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected inline value")
}
@@ -194,11 +194,11 @@ func TestCatchupScanSeesOldIntent(t *testing.T) {
// Run a catchup scan across the span and watch it succeed.
span := roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey}
- iter := NewCatchUpIterator(eng, span, tsCutoff, nil)
+ iter := NewCatchUpIterator(eng, span, tsCutoff, nil, nil)
defer iter.Close()
keys := map[string]struct{}{}
- require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error {
+ require.NoError(t, iter.CatchUpScan(ctx, func(e *roachpb.RangeFeedEvent) error {
keys[string(e.Val.Key)] = struct{}{}
return nil
}, true /* withDiff */))
diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go
index b47256642ae8..db255c91f3ea 100644
--- a/pkg/kv/kvserver/rangefeed/registry.go
+++ b/pkg/kv/kvserver/rangefeed/registry.go
@@ -300,7 +300,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan, run it.
- if err := r.maybeRunCatchUpScan(); err != nil {
+ if err := r.maybeRunCatchUpScan(ctx); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
@@ -372,7 +372,7 @@ func (r *registration) drainAllocations(ctx context.Context) {
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
-func (r *registration) maybeRunCatchUpScan() error {
+func (r *registration) maybeRunCatchUpScan(ctx context.Context) error {
catchUpIter := r.detachCatchUpIter()
if catchUpIter == nil {
return nil
@@ -383,7 +383,7 @@ func (r *registration) maybeRunCatchUpScan() error {
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()
- return catchUpIter.CatchUpScan(r.stream.Send, r.withDiff)
+ return catchUpIter.CatchUpScan(ctx, r.stream.Send, r.withDiff)
}
// ID implements interval.Interface.
diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go
index 507ba98d2c55..43a148e41eca 100644
--- a/pkg/kv/kvserver/rangefeed/registry_test.go
+++ b/pkg/kv/kvserver/rangefeed/registry_test.go
@@ -278,7 +278,7 @@ func TestRegistrationCatchUpScan(t *testing.T) {
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)
require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())
- require.NoError(t, r.maybeRunCatchUpScan())
+ require.NoError(t, r.maybeRunCatchUpScan(context.Background()))
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())
diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go
index ff9ea1668f7c..06fc45041a83 100644
--- a/pkg/kv/kvserver/replica_application_state_machine.go
+++ b/pkg/kv/kvserver/replica_application_state_machine.go
@@ -17,12 +17,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
- "github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
@@ -56,20 +56,7 @@ type applyCommittedEntriesStats struct {
stateAssertions int
numEmptyEntries int
numConfChangeEntries int
- followerStoreWriteBytes followerStoreWriteBytes
-}
-
-// followerStoreWriteBytes captures stats about writes done to a store by a
-// replica that is not the leaseholder. These are used for admission control.
-type followerStoreWriteBytes struct {
- numEntries int64
- admission.StoreWorkDoneInfo
-}
-
-func (f *followerStoreWriteBytes) merge(from followerStoreWriteBytes) {
- f.numEntries += from.numEntries
- f.WriteBytes += from.WriteBytes
- f.IngestedBytes += from.IngestedBytes
+ followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes
}
// nonDeterministicFailure is an error type that indicates that a state machine
@@ -449,7 +436,7 @@ type replicaAppBatch struct {
emptyEntries int
mutations int
start time.Time
- followerStoreWriteBytes followerStoreWriteBytes
+ followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes
// Reused by addAppliedStateKeyToBatch to avoid heap allocations.
asAlloc enginepb.RangeAppliedState
@@ -550,7 +537,7 @@ func (b *replicaAppBatch) Stage(
// nils the AddSSTable field.
if !cmd.IsLocal() {
writeBytes, ingestedBytes := cmd.getStoreWriteByteSizes()
- b.followerStoreWriteBytes.numEntries++
+ b.followerStoreWriteBytes.NumEntries++
b.followerStoreWriteBytes.WriteBytes += writeBytes
b.followerStoreWriteBytes.IngestedBytes += ingestedBytes
}
@@ -1063,7 +1050,7 @@ func (b *replicaAppBatch) recordStatsOnCommit() {
b.sm.stats.entriesProcessedBytes += b.entryBytes
b.sm.stats.numEmptyEntries += b.emptyEntries
b.sm.stats.batchesProcessed++
- b.sm.stats.followerStoreWriteBytes.merge(b.followerStoreWriteBytes)
+ b.sm.stats.followerStoreWriteBytes.Merge(b.followerStoreWriteBytes)
elapsed := timeutil.Since(b.start)
b.r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds())
diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go
index 50e556cdfdee..ecbbbd1b93fa 100644
--- a/pkg/kv/kvserver/replica_raft.go
+++ b/pkg/kv/kvserver/replica_raft.go
@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
@@ -106,7 +107,13 @@ func (r *Replica) evalAndPropose(
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
tok TrackedRequestToken,
-) (chan proposalResult, func(), kvserverbase.CmdIDKey, *StoreWriteBytes, *roachpb.Error) {
+) (
+ chan proposalResult,
+ func(),
+ kvserverbase.CmdIDKey,
+ *kvadmission.StoreWriteBytes,
+ *roachpb.Error,
+) {
defer tok.DoneIfNotMoved(ctx)
idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui)
@@ -167,7 +174,7 @@ func (r *Replica) evalAndPropose(
// typical lag in consensus is expected to be small compared to the time
// granularity of admission control doing token and size estimation (which
// is 15s). Also, admission control corrects for gaps in reporting.
- writeBytes := newStoreWriteBytes()
+ writeBytes := kvadmission.NewStoreWriteBytes()
if proposal.command.WriteBatch != nil {
writeBytes.WriteBytes = int64(len(proposal.command.WriteBatch.Data))
}
@@ -1051,7 +1058,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, getNonDeterministicFailureExplanation(err), err
}
if r.store.cfg.KVAdmissionController != nil &&
- stats.apply.followerStoreWriteBytes.numEntries > 0 {
+ stats.apply.followerStoreWriteBytes.NumEntries > 0 {
r.store.cfg.KVAdmissionController.FollowerStoreWriteBytes(
r.store.StoreID(), stats.apply.followerStoreWriteBytes)
}
diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go
index 170614c4c0ed..9945ba0a548a 100644
--- a/pkg/kv/kvserver/replica_rangefeed.go
+++ b/pkg/kv/kvserver/replica_rangefeed.go
@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -131,16 +132,19 @@ func (tp *rangefeedTxnPusher) ResolveIntents(
// RangeFeed registers a rangefeed over the specified span. It sends updates to
// the provided stream and returns with an optional error when the rangefeed is
-// complete. The provided ConcurrentRequestLimiter is used to limit the number
-// of rangefeeds using catch-up iterators at the same time.
+// complete. The surrounding store's ConcurrentRequestLimiter is used to limit
+// the number of rangefeeds using catch-up iterators at the same time.
func (r *Replica) RangeFeed(
- args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink,
+ args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer,
) *roachpb.Error {
- return r.rangeFeedWithRangeID(r.RangeID, args, stream)
+ return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer)
}
func (r *Replica) rangeFeedWithRangeID(
- _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink,
+ _forStacks roachpb.RangeID,
+ args *roachpb.RangeFeedRequest,
+ stream roachpb.RangeFeedEventSink,
+ pacer *kvadmission.Pacer,
) *roachpb.Error {
if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
@@ -222,7 +226,7 @@ func (r *Replica) rangeFeedWithRangeID(
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
- return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease)
+ return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease, pacer)
}
}
p := r.registerWithRangefeedRaftMuLocked(
diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go
index 7f2830853047..f63b0666cc82 100644
--- a/pkg/kv/kvserver/replica_read.go
+++ b/pkg/kv/kvserver/replica_read.go
@@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
@@ -38,7 +39,12 @@ import (
// reflect the key spans that it read.
func (r *Replica) executeReadOnlyBatch(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
-) (br *roachpb.BatchResponse, _ *concurrency.Guard, _ *StoreWriteBytes, pErr *roachpb.Error) {
+) (
+ br *roachpb.BatchResponse,
+ _ *concurrency.Guard,
+ _ *kvadmission.StoreWriteBytes,
+ pErr *roachpb.Error,
+) {
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()
diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go
index edca0ad25be4..70796469ce40 100644
--- a/pkg/kv/kvserver/replica_send.go
+++ b/pkg/kv/kvserver/replica_send.go
@@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
@@ -120,7 +121,7 @@ func (r *Replica) Send(
// *StoreWriteBytes return value.
func (r *Replica) SendWithWriteBytes(
ctx context.Context, ba *roachpb.BatchRequest,
-) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) {
+) (*roachpb.BatchResponse, *kvadmission.StoreWriteBytes, *roachpb.Error) {
if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
defer pprof.SetGoroutineLabels(ctx)
// Note: the defer statement captured the previous context.
@@ -167,7 +168,7 @@ func (r *Replica) SendWithWriteBytes(
// Differentiate between read-write, read-only, and admin.
var br *roachpb.BatchResponse
var pErr *roachpb.Error
- var writeBytes *StoreWriteBytes
+ var writeBytes *kvadmission.StoreWriteBytes
if isReadOnly {
log.Event(ctx, "read-only path")
fn := (*Replica).executeReadOnlyBatch
@@ -372,7 +373,7 @@ func (r *Replica) maybeAddRangeInfoToResponse(
// concurrency guard back to the caller.
type batchExecutionFn func(
*Replica, context.Context, *roachpb.BatchRequest, *concurrency.Guard,
-) (*roachpb.BatchResponse, *concurrency.Guard, *StoreWriteBytes, *roachpb.Error)
+) (*roachpb.BatchResponse, *concurrency.Guard, *kvadmission.StoreWriteBytes, *roachpb.Error)
var _ batchExecutionFn = (*Replica).executeWriteBatch
var _ batchExecutionFn = (*Replica).executeReadOnlyBatch
@@ -393,7 +394,7 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch
// handles the process of retrying batch execution after addressing the error.
func (r *Replica) executeBatchWithConcurrencyRetries(
ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn,
-) (br *roachpb.BatchResponse, writeBytes *StoreWriteBytes, pErr *roachpb.Error) {
+) (br *roachpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *roachpb.Error) {
// Try to execute command; exit retry loop on success.
var latchSpans, lockSpans *spanset.SpanSet
var requestEvalKind concurrency.RequestEvalKind
@@ -1046,7 +1047,7 @@ func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchReque
// recordRequestWriteBytes records the write bytes from a replica batch
// request.
-func (r *Replica) recordRequestWriteBytes(writeBytes *StoreWriteBytes) {
+func (r *Replica) recordRequestWriteBytes(writeBytes *kvadmission.StoreWriteBytes) {
if writeBytes == nil {
return
}
diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go
index 3924b896a170..7acac0ff25fb 100644
--- a/pkg/kv/kvserver/replica_write.go
+++ b/pkg/kv/kvserver/replica_write.go
@@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
@@ -77,7 +78,12 @@ var migrateApplicationTimeout = settings.RegisterDurationSetting(
// call to applyTimestampCache).
func (r *Replica) executeWriteBatch(
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
-) (br *roachpb.BatchResponse, _ *concurrency.Guard, _ *StoreWriteBytes, pErr *roachpb.Error) {
+) (
+ br *roachpb.BatchResponse,
+ _ *concurrency.Guard,
+ _ *kvadmission.StoreWriteBytes,
+ pErr *roachpb.Error,
+) {
startTime := timeutil.Now()
// Even though we're not a read-only operation by definition, we have to
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index fd8d8171f8a6..080ba7406c92 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/idalloc"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
@@ -61,7 +62,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
- "github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
@@ -84,7 +84,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
- "github.com/cockroachdb/pebble"
"github.com/cockroachdb/redact"
prometheusgo "github.com/prometheus/client_model/go"
"go.etcd.io/etcd/raft/v3"
@@ -1105,7 +1104,7 @@ type StoreConfig struct {
SpanConfigSubscriber spanconfig.KVSubscriber
// KVAdmissionController is an optional field used for admission control.
- KVAdmissionController KVAdmissionController
+ KVAdmissionController kvadmission.Controller
// SchedulerLatencyListener listens in on scheduling latencies, information
// that's then used to adjust various admission control components (like how
@@ -3174,7 +3173,10 @@ func (s *Store) RangeFeed(
// one here.
return roachpb.NewError(roachpb.NewRangeNotFoundError(args.RangeID, s.StoreID()))
}
- return repl.RangeFeed(args, stream)
+
+ tenID, _ := repl.TenantID()
+ pacer := s.cfg.KVAdmissionController.AdmitRangefeedRequest(tenID, args)
+ return repl.RangeFeed(args, stream, pacer)
}
// updateReplicationGauges counts a number of simple replication statistics for
@@ -3887,329 +3889,3 @@ func min(a, b int) int {
}
return b
}
-
-// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted
-// for each export request.
-var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
- settings.SystemOnly,
- "kvadmission.elastic_cpu.duration_per_export_request",
- "controls how many CPU tokens are allotted for each export request",
- admission.MaxElasticCPUDuration,
- func(duration time.Duration) error {
- if duration < admission.MinElasticCPUDuration {
- return fmt.Errorf("minimum CPU duration allowed per export request is %s, got %s",
- admission.MinElasticCPUDuration, duration)
- }
- if duration > admission.MaxElasticCPUDuration {
- return fmt.Errorf("maximum CPU duration allowed per export request is %s, got %s",
- admission.MaxElasticCPUDuration, duration)
- }
- return nil
- },
-)
-
-// KVAdmissionController provides admission control for the KV layer.
-type KVAdmissionController interface {
- // AdmitKVWork must be called before performing KV work.
- // BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be
- // populated for admission to work correctly. If err is non-nil, the
- // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be
- // called after the KV work is done executing.
- AdmitKVWork(
- ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
- ) (AdmissionHandle, error)
- // AdmittedKVWorkDone is called after the admitted KV work is done
- // executing.
- AdmittedKVWorkDone(AdmissionHandle, *StoreWriteBytes)
- // SetTenantWeightProvider is used to set the provider that will be
- // periodically polled for weights. The stopper should be used to terminate
- // the periodic polling.
- SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper)
- // SnapshotIngested informs admission control about a range snapshot
- // ingestion.
- SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats)
- // FollowerStoreWriteBytes informs admission control about writes
- // replicated to a raft follower, that have not been subject to admission
- // control.
- FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes followerStoreWriteBytes)
-}
-
-// TenantWeightProvider can be periodically asked to provide the tenant
-// weights.
-type TenantWeightProvider interface {
- GetTenantWeights() TenantWeights
-}
-
-// TenantWeights contains the various tenant weights.
-type TenantWeights struct {
- // Node is the node level tenant ID => weight.
- Node map[uint64]uint32
- // Stores contains the per-store tenant weights.
- Stores []TenantWeightsForStore
-}
-
-// TenantWeightsForStore contains the tenant weights for a store.
-type TenantWeightsForStore struct {
- roachpb.StoreID
- // Weights is tenant ID => weight.
- Weights map[uint64]uint32
-}
-
-// KVAdmissionControllerImpl implements KVAdmissionController interface.
-type KVAdmissionControllerImpl struct {
- // Admission control queues and coordinators. All three should be nil or
- // non-nil.
- kvAdmissionQ *admission.WorkQueue
- storeGrantCoords *admission.StoreGrantCoordinators
- elasticCPUWorkQueue *admission.ElasticCPUWorkQueue
- settings *cluster.Settings
- every log.EveryN
-}
-
-var _ KVAdmissionController = &KVAdmissionControllerImpl{}
-
-// AdmissionHandle groups data around some piece admitted work. Depending on the
-// type of work, it holds (a) references to specific work queues, (b) state
-// needed to inform said work queues of what work was done after the fact, and
-// (c) information around how much work a request is allowed to do (used for
-// cooperative scheduling with elastic CPU granters).
-//
-// TODO(irfansharif): Consider moving KVAdmissionController and adjacent types
-// into a kvserver/kvadmission package.
-type AdmissionHandle struct {
- tenantID roachpb.TenantID
- storeAdmissionQ *admission.StoreWorkQueue
- storeWorkHandle admission.StoreWorkHandle
- ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle
-
- callAdmittedWorkDoneOnKVAdmissionQ bool
-}
-
-// MakeKVAdmissionController returns a KVAdmissionController. All three
-// parameters must together be nil or non-nil.
-func MakeKVAdmissionController(
- kvAdmissionQ *admission.WorkQueue,
- elasticCPUWorkQueue *admission.ElasticCPUWorkQueue,
- storeGrantCoords *admission.StoreGrantCoordinators,
- settings *cluster.Settings,
-) KVAdmissionController {
- return &KVAdmissionControllerImpl{
- kvAdmissionQ: kvAdmissionQ,
- storeGrantCoords: storeGrantCoords,
- elasticCPUWorkQueue: elasticCPUWorkQueue,
- settings: settings,
- every: log.Every(10 * time.Second),
- }
-}
-
-// AdmitKVWork implements the KVAdmissionController interface.
-//
-// TODO(irfansharif): There's a fair bit happening here and there's no test
-// coverage. Fix that.
-func (n *KVAdmissionControllerImpl) AdmitKVWork(
- ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest,
-) (handle AdmissionHandle, retErr error) {
- ah := AdmissionHandle{tenantID: tenantID}
- if n.kvAdmissionQ == nil {
- return ah, nil
- }
-
- bypassAdmission := ba.IsAdmin()
- source := ba.AdmissionHeader.Source
- if !roachpb.IsSystemTenantID(tenantID.ToUint64()) {
- // Request is from a SQL node.
- bypassAdmission = false
- source = roachpb.AdmissionHeader_FROM_SQL
- }
- if source == roachpb.AdmissionHeader_OTHER {
- bypassAdmission = true
- }
- // TODO(abaptist): Revisit and deprecate this setting in v23.1.
- if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) {
- if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
- bypassAdmission = true
- }
- }
-
- createTime := ba.AdmissionHeader.CreateTime
- if !bypassAdmission && createTime == 0 {
- // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use
- // of zero CreateTime needs to be revisited. It should use high priority.
- createTime = timeutil.Now().UnixNano()
- }
- admissionInfo := admission.WorkInfo{
- TenantID: tenantID,
- Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority),
- CreateTime: createTime,
- BypassAdmission: bypassAdmission,
- }
-
- admissionEnabled := true
- // Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though
- // it would bypass admission, it would consume a slot. When writes are
- // throttled, we start generating more txn heartbeats, which then consume
- // all the slots, causing no useful work to happen. We do want useful work
- // to continue even when throttling since there are often significant
- // number of tokens available.
- if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
- storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
- if storeAdmissionQ != nil {
- storeWorkHandle, err := storeAdmissionQ.Admit(
- ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
- if err != nil {
- return AdmissionHandle{}, err
- }
- admissionEnabled = storeWorkHandle.AdmissionEnabled()
- if admissionEnabled {
- defer func() {
- if retErr != nil {
- // No bytes were written.
- _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{})
- }
- }()
- ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle
- }
- }
- }
- if admissionEnabled {
- if ba.IsSingleExportRequest() {
- // Backups generate batches with single export requests, which we
- // admit through the elastic CPU work queue. We grant this
- // CPU-intensive work a set amount of CPU time and expect it to
- // terminate (cooperatively) once it exceeds its grant. The amount
- // disbursed is 100ms, which we've experimentally found to be long
- // enough to do enough useful work per-request while not causing too
- // much in the way of scheduling delays on individual cores. Within
- // admission control we have machinery that observes scheduling
- // latencies periodically and reduces the total amount of CPU time
- // handed out through this mechanism, as a way to provide latency
- // isolation to non-elastic ("latency sensitive") work running on
- // the same machine.
- elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit(
- ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
- )
- if err != nil {
- return AdmissionHandle{}, err
- }
- ah.ElasticCPUWorkHandle = elasticWorkHandle
- defer func() {
- if retErr != nil {
- // No elastic work was done.
- n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
- }
- }()
- } else {
- callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
- if err != nil {
- return AdmissionHandle{}, err
- }
- ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ
- }
- }
- return ah, nil
-}
-
-// AdmittedKVWorkDone implements the KVAdmissionController interface.
-func (n *KVAdmissionControllerImpl) AdmittedKVWorkDone(
- ah AdmissionHandle, writeBytes *StoreWriteBytes,
-) {
- n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
- if ah.callAdmittedWorkDoneOnKVAdmissionQ {
- n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
- }
- if ah.storeAdmissionQ != nil {
- var doneInfo admission.StoreWorkDoneInfo
- if writeBytes != nil {
- doneInfo = admission.StoreWorkDoneInfo(*writeBytes)
- }
- err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo)
- if err != nil {
- // This shouldn't be happening.
- if buildutil.CrdbTestBuild {
- log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err))
- }
- if n.every.ShouldLog() {
- log.Errorf(context.Background(), "%s", err)
- }
- }
- }
-}
-
-// SetTenantWeightProvider implements the KVAdmissionController interface.
-func (n *KVAdmissionControllerImpl) SetTenantWeightProvider(
- provider TenantWeightProvider, stopper *stop.Stopper,
-) {
- // TODO(irfansharif): Use a stopper here instead.
- go func() {
- const weightCalculationPeriod = 10 * time.Minute
- ticker := time.NewTicker(weightCalculationPeriod)
- // Used for short-circuiting the weights calculation if all weights are
- // disabled.
- allWeightsDisabled := false
- for {
- select {
- case <-ticker.C:
- kvDisabled := !admission.KVTenantWeightsEnabled.Get(&n.settings.SV)
- kvStoresDisabled := !admission.KVStoresTenantWeightsEnabled.Get(&n.settings.SV)
- if allWeightsDisabled && kvDisabled && kvStoresDisabled {
- // Have already transitioned to disabled, so noop.
- continue
- }
- weights := provider.GetTenantWeights()
- if kvDisabled {
- weights.Node = nil
- }
- n.kvAdmissionQ.SetTenantWeights(weights.Node)
- n.elasticCPUWorkQueue.SetTenantWeights(weights.Node)
-
- for _, storeWeights := range weights.Stores {
- q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID))
- if q != nil {
- if kvStoresDisabled {
- storeWeights.Weights = nil
- }
- q.SetTenantWeights(storeWeights.Weights)
- }
- }
- allWeightsDisabled = kvDisabled && kvStoresDisabled
- case <-stopper.ShouldQuiesce():
- ticker.Stop()
- return
- }
- }
- }()
-}
-
-// SnapshotIngested implements the KVAdmissionController interface.
-func (n *KVAdmissionControllerImpl) SnapshotIngested(
- storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats,
-) {
- storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID))
- if storeAdmissionQ == nil {
- return
- }
- storeAdmissionQ.StatsToIgnore(ingestStats)
-}
-
-// FollowerStoreWriteBytes implements the KVAdmissionController interface.
-func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes(
- storeID roachpb.StoreID, followerWriteBytes followerStoreWriteBytes,
-) {
- if followerWriteBytes.WriteBytes == 0 && followerWriteBytes.IngestedBytes == 0 {
- return
- }
- storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID))
- if storeAdmissionQ == nil {
- return
- }
- storeAdmissionQ.BypassedWorkDone(
- followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo)
-}
-
-// ProvisionedBandwidthForAdmissionControl set a value of the provisioned
-// bandwidth for each store in the cluster.
-var ProvisionedBandwidthForAdmissionControl = settings.RegisterByteSizeSetting(
- settings.SystemOnly, "kv.store.admission.provisioned_bandwidth",
- "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
- "for each store. It can be over-ridden on a per-store basis using the --store flag",
- 0).WithPublic()
diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go
index 8c1f7fe804b7..9431c5595b86 100644
--- a/pkg/kv/kvserver/store_send.go
+++ b/pkg/kv/kvserver/store_send.go
@@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
@@ -45,7 +46,7 @@ import (
func (s *Store) Send(
ctx context.Context, ba *roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
- var writeBytes *StoreWriteBytes
+ var writeBytes *kvadmission.StoreWriteBytes
br, writeBytes, pErr = s.SendWithWriteBytes(ctx, ba)
writeBytes.Release()
return br, pErr
@@ -55,7 +56,7 @@ func (s *Store) Send(
// *StoreWriteBytes return value.
func (s *Store) SendWithWriteBytes(
ctx context.Context, ba *roachpb.BatchRequest,
-) (br *roachpb.BatchResponse, writeBytes *StoreWriteBytes, pErr *roachpb.Error) {
+) (br *roachpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *roachpb.Error) {
// Attach any log tags from the store to the context (which normally
// comes from gRPC).
ctx = s.AnnotateCtx(ctx)
diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go
index 94d988a74add..13f9bbff62ae 100644
--- a/pkg/kv/kvserver/stores.go
+++ b/pkg/kv/kvserver/stores.go
@@ -14,7 +14,6 @@ import (
"context"
"fmt"
math "math"
- "sync"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
@@ -22,9 +21,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
- "github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
@@ -187,34 +186,11 @@ func (ls *Stores) Send(
return br, pErr
}
-// StoreWriteBytes aliases admission.StoreWorkDoneInfo, since the notion of
-// "work is done" is specific to admission control and doesn't need to leak
-// everywhere.
-type StoreWriteBytes admission.StoreWorkDoneInfo
-
-var storeWriteBytesPool = sync.Pool{
- New: func() interface{} { return &StoreWriteBytes{} },
-}
-
-func newStoreWriteBytes() *StoreWriteBytes {
- wb := storeWriteBytesPool.Get().(*StoreWriteBytes)
- *wb = StoreWriteBytes{}
- return wb
-}
-
-// Release returns the *StoreWriteBytes to the pool.
-func (wb *StoreWriteBytes) Release() {
- if wb == nil {
- return
- }
- storeWriteBytesPool.Put(wb)
-}
-
// SendWithWriteBytes is the implementation of Send with an additional
// *StoreWriteBytes return value.
func (ls *Stores) SendWithWriteBytes(
ctx context.Context, ba *roachpb.BatchRequest,
-) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) {
+) (*roachpb.BatchResponse, *kvadmission.StoreWriteBytes, *roachpb.Error) {
if err := ba.ValidateForEvaluation(); err != nil {
log.Fatalf(ctx, "invalid batch (%s): %s", ba, err)
}
diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go
index 9d3c832a472f..197384dd529c 100644
--- a/pkg/roachprod/roachprod.go
+++ b/pkg/roachprod/roachprod.go
@@ -1404,13 +1404,11 @@ func StartGrafana(
if err != nil {
return err
}
- urls, err := GrafanaURL(ctx, l, clusterName, false)
+ url, err := GrafanaURL(ctx, l, clusterName, false)
if err != nil {
return err
}
- for i, url := range urls {
- fmt.Printf("Grafana dashboard %d: %s\n", i, url)
- }
+ fmt.Printf("Grafana dashboard: %s\n", url)
return nil
}
@@ -1437,17 +1435,17 @@ func StopGrafana(ctx context.Context, l *logger.Logger, clusterName string, dump
// GrafanaURL returns a url to the grafana dashboard
func GrafanaURL(
ctx context.Context, l *logger.Logger, clusterName string, openInBrowser bool,
-) ([]string, error) {
+) (string, error) {
if err := LoadClusters(); err != nil {
- return nil, err
+ return "", err
}
c, err := newCluster(l, clusterName)
if err != nil {
- return nil, err
+ return "", err
}
nodes, err := install.ListNodes("all", len(c.VMs))
if err != nil {
- return nil, err
+ return "", err
}
// grafana is assumed to be running on the last node in the target
grafanaNode := install.Nodes{nodes[len(nodes)-1]}
@@ -1458,7 +1456,11 @@ func GrafanaURL(
secure: false,
port: 3000,
}
- return urlGenerator(c, l, grafanaNode, uConfig)
+ urls, err := urlGenerator(c, l, grafanaNode, uConfig)
+ if err != nil {
+ return "", err
+ }
+ return urls[0], nil
}
// PrometheusSnapshot takes a snapshot of prometheus and stores the snapshot and
diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index 847493b84d53..5406681ff9d3 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -104,6 +104,7 @@ go_library(
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/sidetransport",
+ "//pkg/kv/kvserver/kvadmission",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
diff --git a/pkg/server/node.go b/pkg/server/node.go
index 895f727a5d12..59112fa09e48 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
@@ -386,7 +387,7 @@ func NewNode(
spanConfigAccessor: spanConfigAccessor,
testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent,
}
- n.storeCfg.KVAdmissionController = kvserver.MakeKVAdmissionController(
+ n.storeCfg.KVAdmissionController = kvadmission.MakeController(
kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings,
)
n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener
@@ -860,7 +861,7 @@ func (n *Node) registerEnginesForDiskStatsMap(
// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
- clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get(
+ clusterProvisionedBandwidth := kvadmission.ProvisionedBandwidth.Get(
&n.storeCfg.Settings.SV)
storeIDToDiskStats, err := n.diskStatsMap.tryPopulateAdmissionDiskStats(
context.Background(), clusterProvisionedBandwidth, status.GetDiskCounters)
@@ -886,13 +887,13 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
}
// GetTenantWeights implements kvserver.TenantWeightProvider.
-func (n *Node) GetTenantWeights() kvserver.TenantWeights {
- weights := kvserver.TenantWeights{
+func (n *Node) GetTenantWeights() kvadmission.TenantWeights {
+ weights := kvadmission.TenantWeights{
Node: make(map[uint64]uint32),
}
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
sw := make(map[uint64]uint32)
- weights.Stores = append(weights.Stores, kvserver.TenantWeightsForStore{
+ weights.Stores = append(weights.Stores, kvadmission.TenantWeightsForStore{
StoreID: store.StoreID(),
Weights: sw,
})
@@ -1106,7 +1107,7 @@ func (n *Node) batchInternal(
ctx = admission.ContextWithElasticCPUWorkHandle(ctx, handle.ElasticCPUWorkHandle)
}
- var writeBytes *kvserver.StoreWriteBytes
+ var writeBytes *kvadmission.StoreWriteBytes
defer func() {
n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes)
writeBytes.Release()
diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go
index 99639056b8ed..745117208136 100644
--- a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go
+++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go
@@ -231,6 +231,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates(
[]roachpb.Span{descriptorTableSpan},
startTS,
handleEvent,
+ rangefeed.WithSystemTablePriority(),
rangefeed.WithDiff(true),
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) {
onFrontierAdvance(ctx, descriptorsRangefeed, resolvedTS)
@@ -290,6 +291,7 @@ func (s *SQLWatcher) watchForZoneConfigUpdates(
[]roachpb.Span{zoneTableSpan},
startTS,
handleEvent,
+ rangefeed.WithSystemTablePriority(),
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) {
onFrontierAdvance(ctx, zonesRangefeed, resolvedTS)
}),
@@ -385,6 +387,7 @@ func (s *SQLWatcher) watchForProtectedTimestampUpdates(
[]roachpb.Span{ptsRecordsTableSpan},
startTS,
handleEvent,
+ rangefeed.WithSystemTablePriority(),
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) {
onFrontierAdvance(ctx, protectedTimestampRangefeed, resolvedTS)
}),
diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go
index 8cb4b2457ced..e58f7a912906 100644
--- a/pkg/sql/catalog/lease/lease.go
+++ b/pkg/sql/catalog/lease/lease.go
@@ -1151,6 +1151,7 @@ func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *desc
// shuts down, so we don't need to call Close() ourselves.
_, _ = m.rangeFeedFactory.RangeFeed(
ctx, "lease", []roachpb.Span{descriptorTableSpan}, hlc.Timestamp{}, handleEvent,
+ rangefeed.WithSystemTablePriority(),
)
}
diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader.go b/pkg/sql/sqlinstance/instancestorage/instancereader.go
index 77c2f5370e16..6b850cd30b78 100644
--- a/pkg/sql/sqlinstance/instancestorage/instancereader.go
+++ b/pkg/sql/sqlinstance/instancestorage/instancereader.go
@@ -161,6 +161,7 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed {
[]roachpb.Span{instancesTableSpan},
r.clock.Now(),
updateCacheFn,
+ rangefeed.WithSystemTablePriority(),
rangefeed.WithInitialScan(initialScanDoneFn),
rangefeed.WithOnInitialScanError(initialScanErrFn),
rangefeed.WithRowTimestampInInitialScan(true),
diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go
index 85cce35b958b..5a13cdb0af11 100644
--- a/pkg/sql/stats/stats_cache.go
+++ b/pkg/sql/stats/stats_cache.go
@@ -179,6 +179,7 @@ func (sc *TableStatisticsCache) Start(
[]roachpb.Span{statsTableSpan},
sc.ClientDB.Clock().Now(),
handleEvent,
+ rangefeed.WithSystemTablePriority(),
)
return err
}
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index ce1807e061af..759422cde7d7 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -3525,6 +3525,7 @@ var charts = []sectionDescription{
"admission.requested.elastic-cpu",
"admission.admitted.elastic-cpu",
"admission.errored.elastic-cpu",
+ "admission.admitted.elastic-cpu.normal",
"admission.admitted.elastic-cpu.bulk",
"admission.admitted.kv-stores.bulk",
"admission.admitted.kv-stores.locking",
@@ -3540,6 +3541,7 @@ var charts = []sectionDescription{
"admission.admitted.sql-root-start.normal",
"admission.admitted.sql-sql-response.locking",
"admission.admitted.sql-sql-response.normal",
+ "admission.errored.elastic-cpu.normal",
"admission.errored.elastic-cpu.bulk",
"admission.errored.kv-stores.bulk",
"admission.errored.kv-stores.locking",
@@ -3555,6 +3557,7 @@ var charts = []sectionDescription{
"admission.errored.sql-root-start.normal",
"admission.errored.sql-sql-response.locking",
"admission.errored.sql-sql-response.normal",
+ "admission.requested.elastic-cpu.normal",
"admission.requested.elastic-cpu.bulk",
"admission.requested.kv-stores.bulk",
"admission.requested.kv-stores.locking",
@@ -3582,6 +3585,7 @@ var charts = []sectionDescription{
"admission.wait_queue_length.sql-leaf-start",
"admission.wait_queue_length.sql-root-start",
"admission.wait_queue_length.elastic-cpu",
+ "admission.wait_queue_length.elastic-cpu.normal",
"admission.wait_queue_length.elastic-cpu.bulk",
"admission.wait_queue_length.kv-stores.bulk",
"admission.wait_queue_length.kv-stores.locking",
@@ -3609,6 +3613,7 @@ var charts = []sectionDescription{
"admission.wait_durations.sql-leaf-start",
"admission.wait_durations.sql-root-start",
"admission.wait_durations.elastic-cpu",
+ "admission.wait_durations.elastic-cpu.normal",
"admission.wait_durations.elastic-cpu.bulk",
"admission.wait_durations.kv-stores.bulk",
"admission.wait_durations.kv-stores.locking",
diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go
index 7f7ebb3bd7da..45ebe90aaff7 100644
--- a/pkg/util/admission/grant_coordinator.go
+++ b/pkg/util/admission/grant_coordinator.go
@@ -373,7 +373,8 @@ func makeElasticGrantCoordinator(
elasticCPUGranterMetrics := makeElasticCPUGranterMetrics()
registry.AddMetricStruct(elasticCPUGranterMetrics)
- elasticWorkQueueMetrics := makeWorkQueueMetrics("elastic-cpu", registry, admissionpb.BulkNormalPri)
+ elasticWorkQueueMetrics := makeWorkQueueMetrics("elastic-cpu", registry,
+ admissionpb.BulkNormalPri, admissionpb.NormalPri)
elasticCPUGranter := newElasticCPUGranter(ambientCtx, st, elasticCPUGranterMetrics)
schedulerLatencyListener := newSchedulerLatencyListener(ambientCtx, st, schedulerLatencyListenerMetrics, elasticCPUGranter)