From 09ad169b5fa5170ee49c8c42b6c195144c6ff973 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 12 Aug 2022 17:13:38 -0400 Subject: [PATCH] base,kvserver,server: configuration of provisioned bandwidth for a store A previous PR #85722 added support for disk bandwidth as a bottlneck resource, in the admission control package. To utilize this, admission control needs to be provided the provisioned bandwidth and the observed read and write bytes. This PR adds configuration support for this via the StoreSpec (that uses the --store flag). The StoreSpec now has an optional ProvisionedRateSpec that contains the name of the disk corresponding to the store, and an optional provisioned bandwidth, that are specified as provisioned-rate=name=[:bandwidth=]. The disk-name is used to map the DiskStats, retrieved via the existing code in status.GetDiskCounters to the correct Store. These DiskStats contain the read and write bytes. The optional bandwidth is used to override the provisioned bandwidth set via the new cluster setting kv.store.admission.provisioned_bandwidth. Release note (ops change): Disk bandwidth constraint can now be used to control admission of elastic writes. This requires configuration for each store, via the --store flag, that now contains a provisioned-rate field. The provisioned-rate field needs to provide a disk-name for the store and optionally a disk bandwidth. If the disk bandwidth is not provided the cluster setting kv.store.admission.provisioned_bandwidth will be used. The cluster setting defaults to 0. If the effective disk bandwidth, i.e., using the possibly overridden cluster setting is 0, there is no disk bandwidth constraint. Additionally, the admission control cluster setting admission.disk_bandwidth_tokens.elastic.enabled (defaults to true) can be used to turn off enforcement even when all the other configuration has been setup. Turning off enforcement will still output all the relevant information about disk bandwidth usage, so can be used to observe part of the mechanism in action. Fixes #82898 --- docs/generated/settings/settings.html | 1 + pkg/base/store_spec.go | 88 ++++++++++++++++++++++- pkg/kv/kvserver/store.go | 8 +++ pkg/server/node.go | 71 +++++++++++++++++- pkg/server/server.go | 4 ++ pkg/server/status/disk_counters.go | 12 ++-- pkg/server/status/disk_counters_darwin.go | 12 ++-- pkg/server/status/runtime.go | 42 ++++++----- pkg/server/status/runtime_test.go | 34 ++++----- 9 files changed, 224 insertions(+), 48 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d460b576a81c..662c839a4947 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -56,6 +56,7 @@ kv.snapshot_delegation.enabledbooleanfalseset to true to allow snapshots from follower replicas kv.snapshot_rebalance.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots +kv.store.admission.provisioned_bandwidthinteger0if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag kv.transaction.max_intents_bytesinteger4194304maximum number of bytes used to track locks in transactions kv.transaction.max_refresh_spans_bytesinteger4194304maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.reject_over_max_intents_budget.enabledbooleanfalseif set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed diff --git a/pkg/base/store_spec.go b/pkg/base/store_spec.go index a8d6c77a9a2e..f20179cf2910 100644 --- a/pkg/base/store_spec.go +++ b/pkg/base/store_spec.go @@ -161,6 +161,69 @@ func (ss *SizeSpec) Set(value string) error { return nil } +// ProvisionedRateSpec is an optional part of the StoreSpec. +type ProvisionedRateSpec struct { + // DiskName is the name of the disk observed by the code in disk_counters.go + // when retrieving stats for this store. + DiskName string + // ProvisionedBandwidth is the bandwidth provisioned for this store in + // bytes/s. + ProvisionedBandwidth int64 +} + +func newStoreProvisionedRateSpec( + field redact.SafeString, value string, +) (ProvisionedRateSpec, error) { + var spec ProvisionedRateSpec + used := make(map[string]struct{}) + for _, split := range strings.Split(value, ":") { + if len(split) == 0 { + continue + } + subSplits := strings.Split(split, "=") + if len(subSplits) != 2 { + return ProvisionedRateSpec{}, errors.Errorf("%s field has invalid value %s", field, value) + } + subField := subSplits[0] + subValue := subSplits[1] + if _, ok := used[subField]; ok { + return ProvisionedRateSpec{}, errors.Errorf("%s field has duplicate sub-field %s", + field, subField) + } + used[subField] = struct{}{} + if len(subField) == 0 { + continue + } + if len(subValue) == 0 { + return ProvisionedRateSpec{}, + errors.Errorf("%s field has no value specified for sub-field %s", field, subField) + } + switch subField { + case "disk-name": + spec.DiskName = subValue + case "bandwidth": + var err error + spec.ProvisionedBandwidth, err = humanizeutil.ParseBytes(value) + if err != nil { + return ProvisionedRateSpec{}, + errors.Wrapf(err, "could not parse bandwidth in field %s", field) + } + if spec.ProvisionedBandwidth == 0 { + return ProvisionedRateSpec{}, + errors.Errorf("%s field is trying to set bandwidth to 0", field) + } + default: + return ProvisionedRateSpec{}, errors.Errorf("%s field has unknown sub-field %s", + field, subField) + } + } + if len(spec.DiskName) == 0 { + return ProvisionedRateSpec{}, + errors.Errorf("%s field did not specify disk-name", field) + } + return spec, nil +} + // StoreSpec contains the details that can be specified in the cli pertaining // to the --store flag. type StoreSpec struct { @@ -189,6 +252,8 @@ type StoreSpec struct { // through to C CCL code to set up encryption-at-rest. Must be set if and // only if encryption is enabled, otherwise left empty. EncryptionOptions []byte + // ProvisionedRateSpec is optional. + ProvisionedRateSpec ProvisionedRateSpec } // String returns a fully parsable version of the store spec. @@ -231,6 +296,16 @@ func (ss StoreSpec) String() string { fmt.Fprint(&buffer, optsStr) fmt.Fprint(&buffer, ",") } + if len(ss.ProvisionedRateSpec.DiskName) > 0 { + fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s", + ss.ProvisionedRateSpec.DiskName) + if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 { + fmt.Fprintf(&buffer, ":bandwidth=%s,", + humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth)) + } else { + fmt.Fprintf(&buffer, ",") + } + } // Trim the extra comma from the end if it exists. if l := buffer.Len(); l > 0 { buffer.Truncate(l - 1) @@ -259,7 +334,7 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0- // NewStoreSpec parses the string passed into a --store flag and returns a // StoreSpec if it is correctly parsed. -// There are four possible fields that can be passed in, comma separated: +// There are five possible fields that can be passed in, comma separated: // - path=xxx The directory in which to the rocks db instance should be // located, required unless using a in memory storage. // - type=mem This specifies that the store is an in memory storage instead of @@ -273,6 +348,10 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0- // - 20% -> 20% of the available space // - 0.2 -> 20% of the available space // - attrs=xxx:yyy:zzz A colon separated list of optional attributes. +// - provisioned-rate=name=[:bandwidth=] The +// provisioned-rate can be used for admission control for operations on the +// store. The bandwidth is optional, and if unspecified, a cluster setting +// (kv.store.admission.provisioned_bandwidth) will be used. // Note that commas are forbidden within any field name or value. func NewStoreSpec(value string) (StoreSpec, error) { const pathField = "path" @@ -399,6 +478,13 @@ func NewStoreSpec(value string) (StoreSpec, error) { return StoreSpec{}, err } ss.PebbleOptions = buf.String() + case "provisioned-rate": + rateSpec, err := newStoreProvisionedRateSpec("provisioned-rate", value) + if err != nil { + return StoreSpec{}, err + } + ss.ProvisionedRateSpec = rateSpec + default: return StoreSpec{}, fmt.Errorf("%s is not a valid store field", field) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index aa72c7f8c7af..c09bb303f2a9 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3958,3 +3958,11 @@ func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes( storeAdmissionQ.BypassedWorkDone( followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo) } + +// ProvisionedBandwidthForAdmissionControl set a value of the provisioned +// bandwidth for each store in the cluster. +var ProvisionedBandwidthForAdmissionControl = settings.RegisterIntSetting( + settings.SystemOnly, "kv.store.admission.provisioned_bandwidth", + "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ + "for each store. It can be over-ridden on a per-store basis using the --store flag", + 0, settings.NonNegativeInt).WithPublic() diff --git a/pkg/server/node.go b/pkg/server/node.go index ea3919de2be4..a6dcc5b75570 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -238,6 +238,8 @@ type Node struct { // COCKROACH_DEBUG_TS_IMPORT_FILE env var. suppressNodeStatus syncutil.AtomicBool + diskStatsMap diskStatsMap + testingErrorEvent func(context.Context, *roachpb.BatchRequest, error) } @@ -775,17 +777,84 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh s.UpdateIOThreshold(threshold) } +type diskStatsMap struct { + provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec + diskNameToStoreID map[string]roachpb.StoreID +} + +func (dsm *diskStatsMap) populateAdmissionDiskStats( + clusterProvisionedBandwidth int64, + diskStats []status.DiskStats, + stats map[roachpb.StoreID]admission.DiskStats, +) { + for id, spec := range dsm.provisionedRate { + s := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth} + if spec.ProvisionedBandwidth > 0 { + s.ProvisionedBandwidth = spec.ProvisionedBandwidth + } + stats[id] = s + } + for i := range diskStats { + if id, ok := dsm.diskNameToStoreID[diskStats[i].Name]; ok { + s := stats[id] + s.BytesRead = uint64(diskStats[i].ReadBytes) + s.BytesWritten = uint64(diskStats[i].WriteBytes) + } + } +} + +func (dsm *diskStatsMap) empty() bool { + return len(dsm.provisionedRate) == 0 +} + +func (n *Node) registerEnginesForDiskStatsMap( + specs []base.StoreSpec, engines []storage.Engine, +) error { + n.diskStatsMap.provisionedRate = make(map[roachpb.StoreID]base.ProvisionedRateSpec) + n.diskStatsMap.diskNameToStoreID = make(map[string]roachpb.StoreID) + for i := range engines { + id, err := kvserver.ReadStoreIdent(context.Background(), engines[i]) + if err != nil { + return err + } + if len(specs[i].ProvisionedRateSpec.DiskName) > 0 { + n.diskStatsMap.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec + n.diskStatsMap.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID + } + } + return nil +} + // GetPebbleMetrics implements admission.PebbleMetricsProvider. func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { + clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get( + &n.storeCfg.Settings.SV) + var storeIDToDiskStats map[roachpb.StoreID]admission.DiskStats + if !n.diskStatsMap.empty() { + diskCounters, err := status.GetDiskCounters(context.Background()) + if err != nil { + log.Warningf(context.Background(), "%v", + errors.Wrapf(err, "unable to get disk stats")) + } else { + storeIDToDiskStats = make(map[roachpb.StoreID]admission.DiskStats) + n.diskStatsMap.populateAdmissionDiskStats( + clusterProvisionedBandwidth, diskCounters, storeIDToDiskStats) + } + } var metrics []admission.StoreMetrics _ = n.stores.VisitStores(func(store *kvserver.Store) error { m := store.Engine().GetMetrics() im := store.Engine().GetInternalIntervalMetrics() + diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth} + if s, ok := storeIDToDiskStats[store.StoreID()]; ok { + diskStats = s + } metrics = append(metrics, admission.StoreMetrics{ StoreID: int32(store.StoreID()), Metrics: m.Metrics, WriteStallCount: m.WriteStallCount, - InternalIntervalMetrics: im}) + InternalIntervalMetrics: im, + DiskStats: diskStats}) return nil }) return metrics diff --git a/pkg/server/server.go b/pkg/server/server.go index 08afc206010b..5f41e51351aa 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1574,6 +1574,10 @@ func (s *Server) PreStart(ctx context.Context) error { return err } + if err := s.node.registerEnginesForDiskStatsMap(s.cfg.Stores.Specs, s.engines); err != nil { + return errors.Wrapf(err, "failed to register engines for the disk stats map") + } + if err := s.debug.RegisterEngines(s.cfg.Stores.Specs, s.engines); err != nil { return errors.Wrapf(err, "failed to register engines with debug server") } diff --git a/pkg/server/status/disk_counters.go b/pkg/server/status/disk_counters.go index 1cb803b389d2..5c79fb262441 100644 --- a/pkg/server/status/disk_counters.go +++ b/pkg/server/status/disk_counters.go @@ -20,20 +20,22 @@ import ( "github.com/shirou/gopsutil/v3/disk" ) -func getDiskCounters(ctx context.Context) ([]diskStats, error) { +// GetDiskCounters returns DiskStats for all disks. +func GetDiskCounters(ctx context.Context) ([]DiskStats, error) { driveStats, err := disk.IOCountersWithContext(ctx) if err != nil { return nil, err } - output := make([]diskStats, len(driveStats)) + output := make([]DiskStats, len(driveStats)) i := 0 for _, counters := range driveStats { - output[i] = diskStats{ - readBytes: int64(counters.ReadBytes), + output[i] = DiskStats{ + Name: counters.Name, + ReadBytes: int64(counters.ReadBytes), readCount: int64(counters.ReadCount), readTime: time.Duration(counters.ReadTime) * time.Millisecond, - writeBytes: int64(counters.WriteBytes), + WriteBytes: int64(counters.WriteBytes), writeCount: int64(counters.WriteCount), writeTime: time.Duration(counters.WriteTime) * time.Millisecond, ioTime: time.Duration(counters.IoTime) * time.Millisecond, diff --git a/pkg/server/status/disk_counters_darwin.go b/pkg/server/status/disk_counters_darwin.go index 84599ce9b4bb..97c13b08a1e7 100644 --- a/pkg/server/status/disk_counters_darwin.go +++ b/pkg/server/status/disk_counters_darwin.go @@ -19,19 +19,21 @@ import ( "github.com/lufia/iostat" ) -func getDiskCounters(context.Context) ([]diskStats, error) { +// GetDiskCounters returns DiskStats for all disks. +func GetDiskCounters(context.Context) ([]DiskStats, error) { driveStats, err := iostat.ReadDriveStats() if err != nil { return nil, err } - output := make([]diskStats, len(driveStats)) + output := make([]DiskStats, len(driveStats)) for i, counters := range driveStats { - output[i] = diskStats{ - readBytes: counters.BytesRead, + output[i] = DiskStats{ + Name: counters.Name, + ReadBytes: counters.BytesRead, readCount: counters.NumRead, readTime: counters.TotalReadTime, - writeBytes: counters.BytesWritten, + WriteBytes: counters.BytesWritten, writeCount: counters.NumWrite, writeTime: counters.TotalWriteTime, ioTime: 0, // Not reported by this library. diff --git a/pkg/server/status/runtime.go b/pkg/server/status/runtime.go index 70eb0b5254b3..872885280264 100644 --- a/pkg/server/status/runtime.go +++ b/pkg/server/status/runtime.go @@ -260,12 +260,12 @@ type RuntimeStatSampler struct { cgoCall int64 gcCount int64 gcPauseTime uint64 - disk diskStats + disk DiskStats net net.IOCountersStat runnableSum float64 } - initialDiskCounters diskStats + initialDiskCounters DiskStats initialNetCounters net.IOCountersStat // Only show "not implemented" errors once, we don't need the log spam. @@ -472,7 +472,7 @@ func (rsr *RuntimeStatSampler) SampleEnvironment( } } - var deltaDisk diskStats + var deltaDisk DiskStats diskCounters, err := getSummedDiskCounters(ctx) if err != nil { log.Ops.Warningf(ctx, "problem fetching disk stats: %s; disk stats will be empty.", err) @@ -482,10 +482,10 @@ func (rsr *RuntimeStatSampler) SampleEnvironment( rsr.last.disk = diskCounters subtractDiskCounters(&diskCounters, rsr.initialDiskCounters) - rsr.HostDiskReadBytes.Update(diskCounters.readBytes) + rsr.HostDiskReadBytes.Update(diskCounters.ReadBytes) rsr.HostDiskReadCount.Update(diskCounters.readCount) rsr.HostDiskReadTime.Update(int64(diskCounters.readTime)) - rsr.HostDiskWriteBytes.Update(diskCounters.writeBytes) + rsr.HostDiskWriteBytes.Update(diskCounters.WriteBytes) rsr.HostDiskWriteCount.Update(diskCounters.writeCount) rsr.HostDiskWriteTime.Update(int64(diskCounters.writeTime)) rsr.HostDiskIOTime.Update(int64(diskCounters.ioTime)) @@ -590,22 +590,26 @@ func (rsr *RuntimeStatSampler) GetCPUCombinedPercentNorm() float64 { return rsr.CPUCombinedPercentNorm.Value() } -// diskStats contains the disk statistics returned by the operating +// DiskStats contains the disk statistics returned by the operating // system. Interpretation of some of these stats varies by platform, // although as much as possible they are normalized to the semantics // used by linux's diskstats interface. // // Except for iopsInProgress, these metrics act like counters (always // increasing, and best interpreted as a rate). -type diskStats struct { - readBytes int64 +type DiskStats struct { + // Name is the disk name. + Name string + // ReadBytes is the cumulative bytes read. + ReadBytes int64 readCount int64 // readTime (and writeTime) may increase more than 1s per second if // access to storage is parallelized. readTime time.Duration - writeBytes int64 + // WriteBytes is the cumulative bytes written. + WriteBytes int64 writeCount int64 writeTime time.Duration @@ -623,10 +627,10 @@ type diskStats struct { iopsInProgress int64 } -func getSummedDiskCounters(ctx context.Context) (diskStats, error) { - diskCounters, err := getDiskCounters(ctx) +func getSummedDiskCounters(ctx context.Context) (DiskStats, error) { + diskCounters, err := GetDiskCounters(ctx) if err != nil { - return diskStats{}, err + return DiskStats{}, err } return sumDiskCounters(diskCounters), nil @@ -643,14 +647,14 @@ func getSummedNetStats(ctx context.Context) (net.IOCountersStat, error) { // sumDiskCounters returns a new disk.IOCountersStat whose values are the sum of the // values in the slice of disk.IOCountersStats passed in. -func sumDiskCounters(disksStats []diskStats) diskStats { - output := diskStats{} +func sumDiskCounters(disksStats []DiskStats) DiskStats { + output := DiskStats{} for _, stats := range disksStats { - output.readBytes += stats.readBytes + output.ReadBytes += stats.ReadBytes output.readCount += stats.readCount output.readTime += stats.readTime - output.writeBytes += stats.writeBytes + output.WriteBytes += stats.WriteBytes output.writeCount += stats.writeCount output.writeTime += stats.writeTime @@ -664,13 +668,13 @@ func sumDiskCounters(disksStats []diskStats) diskStats { // subtractDiskCounters subtracts the counters in `sub` from the counters in `from`, // saving the results in `from`. -func subtractDiskCounters(from *diskStats, sub diskStats) { +func subtractDiskCounters(from *DiskStats, sub DiskStats) { from.writeCount -= sub.writeCount - from.writeBytes -= sub.writeBytes + from.WriteBytes -= sub.WriteBytes from.writeTime -= sub.writeTime from.readCount -= sub.readCount - from.readBytes -= sub.readBytes + from.ReadBytes -= sub.ReadBytes from.readTime -= sub.readTime from.ioTime -= sub.ioTime diff --git a/pkg/server/status/runtime_test.go b/pkg/server/status/runtime_test.go index 42daeec22200..0b6866ba8ed6 100644 --- a/pkg/server/status/runtime_test.go +++ b/pkg/server/status/runtime_test.go @@ -21,27 +21,27 @@ import ( func TestSumDiskCounters(t *testing.T) { defer leaktest.AfterTest(t)() - counters := []diskStats{ + counters := []DiskStats{ { - readBytes: 1, + ReadBytes: 1, readCount: 1, iopsInProgress: 1, - writeBytes: 1, + WriteBytes: 1, writeCount: 1, }, { - readBytes: 1, + ReadBytes: 1, readCount: 1, iopsInProgress: 1, - writeBytes: 1, + WriteBytes: 1, writeCount: 1, }, } summed := sumDiskCounters(counters) - expected := diskStats{ - readBytes: 2, + expected := DiskStats{ + ReadBytes: 2, readCount: 2, - writeBytes: 2, + WriteBytes: 2, writeCount: 2, iopsInProgress: 2, } @@ -82,24 +82,24 @@ func TestSumNetCounters(t *testing.T) { func TestSubtractDiskCounters(t *testing.T) { defer leaktest.AfterTest(t)() - from := diskStats{ - readBytes: 3, + from := DiskStats{ + ReadBytes: 3, readCount: 3, - writeBytes: 3, + WriteBytes: 3, writeCount: 3, iopsInProgress: 3, } - sub := diskStats{ - readBytes: 1, + sub := DiskStats{ + ReadBytes: 1, readCount: 1, iopsInProgress: 1, - writeBytes: 1, + WriteBytes: 1, writeCount: 1, } - expected := diskStats{ - readBytes: 2, + expected := DiskStats{ + ReadBytes: 2, readCount: 2, - writeBytes: 2, + WriteBytes: 2, writeCount: 2, // Don't touch iops in progress; it is a gauge, not a counter. iopsInProgress: 3,