diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index d460b576a81c..662c839a4947 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -56,6 +56,7 @@
kv.snapshot_delegation.enabled | boolean | false | set to true to allow snapshots from follower replicas |
kv.snapshot_rebalance.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for rebalance and upreplication snapshots |
kv.snapshot_recovery.max_rate | byte size | 32 MiB | the rate limit (bytes/sec) to use for recovery snapshots |
+kv.store.admission.provisioned_bandwidth | integer | 0 | if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag |
kv.transaction.max_intents_bytes | integer | 4194304 | maximum number of bytes used to track locks in transactions |
kv.transaction.max_refresh_spans_bytes | integer | 4194304 | maximum number of bytes used to track refresh spans in serializable transactions |
kv.transaction.reject_over_max_intents_budget.enabled | boolean | false | if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed |
diff --git a/pkg/base/store_spec.go b/pkg/base/store_spec.go
index a8d6c77a9a2e..f20179cf2910 100644
--- a/pkg/base/store_spec.go
+++ b/pkg/base/store_spec.go
@@ -161,6 +161,69 @@ func (ss *SizeSpec) Set(value string) error {
return nil
}
+// ProvisionedRateSpec is an optional part of the StoreSpec.
+type ProvisionedRateSpec struct {
+ // DiskName is the name of the disk observed by the code in disk_counters.go
+ // when retrieving stats for this store.
+ DiskName string
+ // ProvisionedBandwidth is the bandwidth provisioned for this store in
+ // bytes/s.
+ ProvisionedBandwidth int64
+}
+
+func newStoreProvisionedRateSpec(
+ field redact.SafeString, value string,
+) (ProvisionedRateSpec, error) {
+ var spec ProvisionedRateSpec
+ used := make(map[string]struct{})
+ for _, split := range strings.Split(value, ":") {
+ if len(split) == 0 {
+ continue
+ }
+ subSplits := strings.Split(split, "=")
+ if len(subSplits) != 2 {
+ return ProvisionedRateSpec{}, errors.Errorf("%s field has invalid value %s", field, value)
+ }
+ subField := subSplits[0]
+ subValue := subSplits[1]
+ if _, ok := used[subField]; ok {
+ return ProvisionedRateSpec{}, errors.Errorf("%s field has duplicate sub-field %s",
+ field, subField)
+ }
+ used[subField] = struct{}{}
+ if len(subField) == 0 {
+ continue
+ }
+ if len(subValue) == 0 {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field has no value specified for sub-field %s", field, subField)
+ }
+ switch subField {
+ case "disk-name":
+ spec.DiskName = subValue
+ case "bandwidth":
+ var err error
+ spec.ProvisionedBandwidth, err = humanizeutil.ParseBytes(value)
+ if err != nil {
+ return ProvisionedRateSpec{},
+ errors.Wrapf(err, "could not parse bandwidth in field %s", field)
+ }
+ if spec.ProvisionedBandwidth == 0 {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field is trying to set bandwidth to 0", field)
+ }
+ default:
+ return ProvisionedRateSpec{}, errors.Errorf("%s field has unknown sub-field %s",
+ field, subField)
+ }
+ }
+ if len(spec.DiskName) == 0 {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field did not specify disk-name", field)
+ }
+ return spec, nil
+}
+
// StoreSpec contains the details that can be specified in the cli pertaining
// to the --store flag.
type StoreSpec struct {
@@ -189,6 +252,8 @@ type StoreSpec struct {
// through to C CCL code to set up encryption-at-rest. Must be set if and
// only if encryption is enabled, otherwise left empty.
EncryptionOptions []byte
+ // ProvisionedRateSpec is optional.
+ ProvisionedRateSpec ProvisionedRateSpec
}
// String returns a fully parsable version of the store spec.
@@ -231,6 +296,16 @@ func (ss StoreSpec) String() string {
fmt.Fprint(&buffer, optsStr)
fmt.Fprint(&buffer, ",")
}
+ if len(ss.ProvisionedRateSpec.DiskName) > 0 {
+ fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s",
+ ss.ProvisionedRateSpec.DiskName)
+ if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
+ fmt.Fprintf(&buffer, ":bandwidth=%s,",
+ humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth))
+ } else {
+ fmt.Fprintf(&buffer, ",")
+ }
+ }
// Trim the extra comma from the end if it exists.
if l := buffer.Len(); l > 0 {
buffer.Truncate(l - 1)
@@ -259,7 +334,7 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// NewStoreSpec parses the string passed into a --store flag and returns a
// StoreSpec if it is correctly parsed.
-// There are four possible fields that can be passed in, comma separated:
+// There are five possible fields that can be passed in, comma separated:
// - path=xxx The directory in which to the rocks db instance should be
// located, required unless using a in memory storage.
// - type=mem This specifies that the store is an in memory storage instead of
@@ -273,6 +348,10 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// - 20% -> 20% of the available space
// - 0.2 -> 20% of the available space
// - attrs=xxx:yyy:zzz A colon separated list of optional attributes.
+// - provisioned-rate=name=[:bandwidth=] The
+// provisioned-rate can be used for admission control for operations on the
+// store. The bandwidth is optional, and if unspecified, a cluster setting
+// (kv.store.admission.provisioned_bandwidth) will be used.
// Note that commas are forbidden within any field name or value.
func NewStoreSpec(value string) (StoreSpec, error) {
const pathField = "path"
@@ -399,6 +478,13 @@ func NewStoreSpec(value string) (StoreSpec, error) {
return StoreSpec{}, err
}
ss.PebbleOptions = buf.String()
+ case "provisioned-rate":
+ rateSpec, err := newStoreProvisionedRateSpec("provisioned-rate", value)
+ if err != nil {
+ return StoreSpec{}, err
+ }
+ ss.ProvisionedRateSpec = rateSpec
+
default:
return StoreSpec{}, fmt.Errorf("%s is not a valid store field", field)
}
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index aa72c7f8c7af..c09bb303f2a9 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -3958,3 +3958,11 @@ func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes(
storeAdmissionQ.BypassedWorkDone(
followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo)
}
+
+// ProvisionedBandwidthForAdmissionControl set a value of the provisioned
+// bandwidth for each store in the cluster.
+var ProvisionedBandwidthForAdmissionControl = settings.RegisterIntSetting(
+ settings.SystemOnly, "kv.store.admission.provisioned_bandwidth",
+ "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
+ "for each store. It can be over-ridden on a per-store basis using the --store flag",
+ 0, settings.NonNegativeInt).WithPublic()
diff --git a/pkg/server/node.go b/pkg/server/node.go
index ea3919de2be4..a6dcc5b75570 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -238,6 +238,8 @@ type Node struct {
// COCKROACH_DEBUG_TS_IMPORT_FILE env var.
suppressNodeStatus syncutil.AtomicBool
+ diskStatsMap diskStatsMap
+
testingErrorEvent func(context.Context, *roachpb.BatchRequest, error)
}
@@ -775,17 +777,84 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh
s.UpdateIOThreshold(threshold)
}
+type diskStatsMap struct {
+ provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
+ diskNameToStoreID map[string]roachpb.StoreID
+}
+
+func (dsm *diskStatsMap) populateAdmissionDiskStats(
+ clusterProvisionedBandwidth int64,
+ diskStats []status.DiskStats,
+ stats map[roachpb.StoreID]admission.DiskStats,
+) {
+ for id, spec := range dsm.provisionedRate {
+ s := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
+ if spec.ProvisionedBandwidth > 0 {
+ s.ProvisionedBandwidth = spec.ProvisionedBandwidth
+ }
+ stats[id] = s
+ }
+ for i := range diskStats {
+ if id, ok := dsm.diskNameToStoreID[diskStats[i].Name]; ok {
+ s := stats[id]
+ s.BytesRead = uint64(diskStats[i].ReadBytes)
+ s.BytesWritten = uint64(diskStats[i].WriteBytes)
+ }
+ }
+}
+
+func (dsm *diskStatsMap) empty() bool {
+ return len(dsm.provisionedRate) == 0
+}
+
+func (n *Node) registerEnginesForDiskStatsMap(
+ specs []base.StoreSpec, engines []storage.Engine,
+) error {
+ n.diskStatsMap.provisionedRate = make(map[roachpb.StoreID]base.ProvisionedRateSpec)
+ n.diskStatsMap.diskNameToStoreID = make(map[string]roachpb.StoreID)
+ for i := range engines {
+ id, err := kvserver.ReadStoreIdent(context.Background(), engines[i])
+ if err != nil {
+ return err
+ }
+ if len(specs[i].ProvisionedRateSpec.DiskName) > 0 {
+ n.diskStatsMap.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
+ n.diskStatsMap.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID
+ }
+ }
+ return nil
+}
+
// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
+ clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get(
+ &n.storeCfg.Settings.SV)
+ var storeIDToDiskStats map[roachpb.StoreID]admission.DiskStats
+ if !n.diskStatsMap.empty() {
+ diskCounters, err := status.GetDiskCounters(context.Background())
+ if err != nil {
+ log.Warningf(context.Background(), "%v",
+ errors.Wrapf(err, "unable to get disk stats"))
+ } else {
+ storeIDToDiskStats = make(map[roachpb.StoreID]admission.DiskStats)
+ n.diskStatsMap.populateAdmissionDiskStats(
+ clusterProvisionedBandwidth, diskCounters, storeIDToDiskStats)
+ }
+ }
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
im := store.Engine().GetInternalIntervalMetrics()
+ diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
+ if s, ok := storeIDToDiskStats[store.StoreID()]; ok {
+ diskStats = s
+ }
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
- InternalIntervalMetrics: im})
+ InternalIntervalMetrics: im,
+ DiskStats: diskStats})
return nil
})
return metrics
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 08afc206010b..5f41e51351aa 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -1574,6 +1574,10 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}
+ if err := s.node.registerEnginesForDiskStatsMap(s.cfg.Stores.Specs, s.engines); err != nil {
+ return errors.Wrapf(err, "failed to register engines for the disk stats map")
+ }
+
if err := s.debug.RegisterEngines(s.cfg.Stores.Specs, s.engines); err != nil {
return errors.Wrapf(err, "failed to register engines with debug server")
}
diff --git a/pkg/server/status/disk_counters.go b/pkg/server/status/disk_counters.go
index 1cb803b389d2..5c79fb262441 100644
--- a/pkg/server/status/disk_counters.go
+++ b/pkg/server/status/disk_counters.go
@@ -20,20 +20,22 @@ import (
"github.com/shirou/gopsutil/v3/disk"
)
-func getDiskCounters(ctx context.Context) ([]diskStats, error) {
+// GetDiskCounters returns DiskStats for all disks.
+func GetDiskCounters(ctx context.Context) ([]DiskStats, error) {
driveStats, err := disk.IOCountersWithContext(ctx)
if err != nil {
return nil, err
}
- output := make([]diskStats, len(driveStats))
+ output := make([]DiskStats, len(driveStats))
i := 0
for _, counters := range driveStats {
- output[i] = diskStats{
- readBytes: int64(counters.ReadBytes),
+ output[i] = DiskStats{
+ Name: counters.Name,
+ ReadBytes: int64(counters.ReadBytes),
readCount: int64(counters.ReadCount),
readTime: time.Duration(counters.ReadTime) * time.Millisecond,
- writeBytes: int64(counters.WriteBytes),
+ WriteBytes: int64(counters.WriteBytes),
writeCount: int64(counters.WriteCount),
writeTime: time.Duration(counters.WriteTime) * time.Millisecond,
ioTime: time.Duration(counters.IoTime) * time.Millisecond,
diff --git a/pkg/server/status/disk_counters_darwin.go b/pkg/server/status/disk_counters_darwin.go
index 84599ce9b4bb..97c13b08a1e7 100644
--- a/pkg/server/status/disk_counters_darwin.go
+++ b/pkg/server/status/disk_counters_darwin.go
@@ -19,19 +19,21 @@ import (
"github.com/lufia/iostat"
)
-func getDiskCounters(context.Context) ([]diskStats, error) {
+// GetDiskCounters returns DiskStats for all disks.
+func GetDiskCounters(context.Context) ([]DiskStats, error) {
driveStats, err := iostat.ReadDriveStats()
if err != nil {
return nil, err
}
- output := make([]diskStats, len(driveStats))
+ output := make([]DiskStats, len(driveStats))
for i, counters := range driveStats {
- output[i] = diskStats{
- readBytes: counters.BytesRead,
+ output[i] = DiskStats{
+ Name: counters.Name,
+ ReadBytes: counters.BytesRead,
readCount: counters.NumRead,
readTime: counters.TotalReadTime,
- writeBytes: counters.BytesWritten,
+ WriteBytes: counters.BytesWritten,
writeCount: counters.NumWrite,
writeTime: counters.TotalWriteTime,
ioTime: 0, // Not reported by this library.
diff --git a/pkg/server/status/runtime.go b/pkg/server/status/runtime.go
index 70eb0b5254b3..872885280264 100644
--- a/pkg/server/status/runtime.go
+++ b/pkg/server/status/runtime.go
@@ -260,12 +260,12 @@ type RuntimeStatSampler struct {
cgoCall int64
gcCount int64
gcPauseTime uint64
- disk diskStats
+ disk DiskStats
net net.IOCountersStat
runnableSum float64
}
- initialDiskCounters diskStats
+ initialDiskCounters DiskStats
initialNetCounters net.IOCountersStat
// Only show "not implemented" errors once, we don't need the log spam.
@@ -472,7 +472,7 @@ func (rsr *RuntimeStatSampler) SampleEnvironment(
}
}
- var deltaDisk diskStats
+ var deltaDisk DiskStats
diskCounters, err := getSummedDiskCounters(ctx)
if err != nil {
log.Ops.Warningf(ctx, "problem fetching disk stats: %s; disk stats will be empty.", err)
@@ -482,10 +482,10 @@ func (rsr *RuntimeStatSampler) SampleEnvironment(
rsr.last.disk = diskCounters
subtractDiskCounters(&diskCounters, rsr.initialDiskCounters)
- rsr.HostDiskReadBytes.Update(diskCounters.readBytes)
+ rsr.HostDiskReadBytes.Update(diskCounters.ReadBytes)
rsr.HostDiskReadCount.Update(diskCounters.readCount)
rsr.HostDiskReadTime.Update(int64(diskCounters.readTime))
- rsr.HostDiskWriteBytes.Update(diskCounters.writeBytes)
+ rsr.HostDiskWriteBytes.Update(diskCounters.WriteBytes)
rsr.HostDiskWriteCount.Update(diskCounters.writeCount)
rsr.HostDiskWriteTime.Update(int64(diskCounters.writeTime))
rsr.HostDiskIOTime.Update(int64(diskCounters.ioTime))
@@ -590,22 +590,26 @@ func (rsr *RuntimeStatSampler) GetCPUCombinedPercentNorm() float64 {
return rsr.CPUCombinedPercentNorm.Value()
}
-// diskStats contains the disk statistics returned by the operating
+// DiskStats contains the disk statistics returned by the operating
// system. Interpretation of some of these stats varies by platform,
// although as much as possible they are normalized to the semantics
// used by linux's diskstats interface.
//
// Except for iopsInProgress, these metrics act like counters (always
// increasing, and best interpreted as a rate).
-type diskStats struct {
- readBytes int64
+type DiskStats struct {
+ // Name is the disk name.
+ Name string
+ // ReadBytes is the cumulative bytes read.
+ ReadBytes int64
readCount int64
// readTime (and writeTime) may increase more than 1s per second if
// access to storage is parallelized.
readTime time.Duration
- writeBytes int64
+ // WriteBytes is the cumulative bytes written.
+ WriteBytes int64
writeCount int64
writeTime time.Duration
@@ -623,10 +627,10 @@ type diskStats struct {
iopsInProgress int64
}
-func getSummedDiskCounters(ctx context.Context) (diskStats, error) {
- diskCounters, err := getDiskCounters(ctx)
+func getSummedDiskCounters(ctx context.Context) (DiskStats, error) {
+ diskCounters, err := GetDiskCounters(ctx)
if err != nil {
- return diskStats{}, err
+ return DiskStats{}, err
}
return sumDiskCounters(diskCounters), nil
@@ -643,14 +647,14 @@ func getSummedNetStats(ctx context.Context) (net.IOCountersStat, error) {
// sumDiskCounters returns a new disk.IOCountersStat whose values are the sum of the
// values in the slice of disk.IOCountersStats passed in.
-func sumDiskCounters(disksStats []diskStats) diskStats {
- output := diskStats{}
+func sumDiskCounters(disksStats []DiskStats) DiskStats {
+ output := DiskStats{}
for _, stats := range disksStats {
- output.readBytes += stats.readBytes
+ output.ReadBytes += stats.ReadBytes
output.readCount += stats.readCount
output.readTime += stats.readTime
- output.writeBytes += stats.writeBytes
+ output.WriteBytes += stats.WriteBytes
output.writeCount += stats.writeCount
output.writeTime += stats.writeTime
@@ -664,13 +668,13 @@ func sumDiskCounters(disksStats []diskStats) diskStats {
// subtractDiskCounters subtracts the counters in `sub` from the counters in `from`,
// saving the results in `from`.
-func subtractDiskCounters(from *diskStats, sub diskStats) {
+func subtractDiskCounters(from *DiskStats, sub DiskStats) {
from.writeCount -= sub.writeCount
- from.writeBytes -= sub.writeBytes
+ from.WriteBytes -= sub.WriteBytes
from.writeTime -= sub.writeTime
from.readCount -= sub.readCount
- from.readBytes -= sub.readBytes
+ from.ReadBytes -= sub.ReadBytes
from.readTime -= sub.readTime
from.ioTime -= sub.ioTime
diff --git a/pkg/server/status/runtime_test.go b/pkg/server/status/runtime_test.go
index 42daeec22200..0b6866ba8ed6 100644
--- a/pkg/server/status/runtime_test.go
+++ b/pkg/server/status/runtime_test.go
@@ -21,27 +21,27 @@ import (
func TestSumDiskCounters(t *testing.T) {
defer leaktest.AfterTest(t)()
- counters := []diskStats{
+ counters := []DiskStats{
{
- readBytes: 1,
+ ReadBytes: 1,
readCount: 1,
iopsInProgress: 1,
- writeBytes: 1,
+ WriteBytes: 1,
writeCount: 1,
},
{
- readBytes: 1,
+ ReadBytes: 1,
readCount: 1,
iopsInProgress: 1,
- writeBytes: 1,
+ WriteBytes: 1,
writeCount: 1,
},
}
summed := sumDiskCounters(counters)
- expected := diskStats{
- readBytes: 2,
+ expected := DiskStats{
+ ReadBytes: 2,
readCount: 2,
- writeBytes: 2,
+ WriteBytes: 2,
writeCount: 2,
iopsInProgress: 2,
}
@@ -82,24 +82,24 @@ func TestSumNetCounters(t *testing.T) {
func TestSubtractDiskCounters(t *testing.T) {
defer leaktest.AfterTest(t)()
- from := diskStats{
- readBytes: 3,
+ from := DiskStats{
+ ReadBytes: 3,
readCount: 3,
- writeBytes: 3,
+ WriteBytes: 3,
writeCount: 3,
iopsInProgress: 3,
}
- sub := diskStats{
- readBytes: 1,
+ sub := DiskStats{
+ ReadBytes: 1,
readCount: 1,
iopsInProgress: 1,
- writeBytes: 1,
+ WriteBytes: 1,
writeCount: 1,
}
- expected := diskStats{
- readBytes: 2,
+ expected := DiskStats{
+ ReadBytes: 2,
readCount: 2,
- writeBytes: 2,
+ WriteBytes: 2,
writeCount: 2,
// Don't touch iops in progress; it is a gauge, not a counter.
iopsInProgress: 3,