Skip to content

Commit

Permalink
base,kvserver,server: configuration of provisioned bandwidth for a store
Browse files Browse the repository at this point in the history
A previous PR #85722 added support for disk bandwidth as a bottlneck
resource, in the admission control package. To utilize this, admission
control needs to be provided the provisioned bandwidth and the observed
read and write bytes. This PR adds configuration support for this via
the StoreSpec (that uses the --store flag). The StoreSpec now has an
optional ProvisionedRateSpec that contains the name of the disk
corresponding to the store, and an optional provisioned bandwidth,
that are specified as
provisioned-rate=name=<disk-name>[:bandwidth=<bandwidth-bytes>].

The disk-name is used to map the DiskStats, retrieved via the existing
code in status.GetDiskCounters to the correct Store. These DiskStats
contain the read and write bytes. The optional bandwidth is used to
override the provisioned bandwidth set via the new cluster setting
kv.store.admission.provisioned_bandwidth.

Fixes #82898

Release note (ops change): Disk bandwidth constraint can now be
used to control admission of elastic writes. This requires configuration
for each store, via the --store flag, that now contains an optional
provisioned-rate field. The provisioned-rate field, if specified,
needs to provide a disk-name for the store and optionally a disk
bandwidth. If the disk bandwidth is not provided the cluster setting
kv.store.admission.provisioned_bandwidth will be used. The cluster
setting defaults to 0 (which means that the disk bandwidth constraint
is disabled). If the effective disk bandwidth, i.e., using the possibly
overridden cluster setting is 0, there is no disk bandwidth constraint.
Additionally, the admission control cluster setting
admission.disk_bandwidth_tokens.elastic.enabled (defaults to true)
can be used to turn off enforcement even when all the other
configuration has been setup. Turning off enforcement will still
output all the relevant information about disk bandwidth usage, so
can be used to observe part of the mechanism in action.
To summarize, to enable this for a cluster with homogenous disk,
provide a disk-name in the provisioned-rate field in the store-spec,
and set the kv.store.admission.provisioned_bandwidth cluster setting
to the bandwidth limit. To only get information about disk bandwidth
usage by elastic traffic (currently via logs, not metrics), do the
above but also set admission.disk_bandwidth_tokens.elastic.enabled
to false.

Release justification: Low risk, high benefit change that allows
an operator to enable new functionality (disabled by default).
  • Loading branch information
sumeerbhola committed Aug 22, 2022
1 parent e6a7dc2 commit 833a031
Show file tree
Hide file tree
Showing 15 changed files with 435 additions and 79 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<tr><td><code>kv.snapshot_delegation.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.store.admission.provisioned_bandwidth</code></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.reject_over_max_intents_budget.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td></tr>
Expand Down
105 changes: 104 additions & 1 deletion pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,86 @@ func (ss *SizeSpec) Set(value string) error {
return nil
}

// ProvisionedRateSpec is an optional part of the StoreSpec.
//
// TODO(sumeer): We should map the file path specified in the store spec to
// the disk name. df can be used to map paths to names like /dev/nvme1n1 and
// /dev/sdb (these examples are from AWS EBS and GCP PD respectively) and the
// corresponding names produced by disk_counters.go are nvme1n1 and sdb
// respectively. We need to find or write a platform independent library --
// see the discussion on
// https://github.com/cockroachdb/cockroach/pull/86063#pullrequestreview-1074487018.
// With that change, the ProvisionedRateSpec would only be needed to override
// the cluster setting when there are heterogenous bandwidth limits in a
// cluster (there would be no more DiskName field).
type ProvisionedRateSpec struct {
// DiskName is the name of the disk observed by the code in disk_counters.go
// when retrieving stats for this store.
DiskName string
// ProvisionedBandwidth is the bandwidth provisioned for this store in
// bytes/s.
ProvisionedBandwidth int64
}

func newStoreProvisionedRateSpec(
field redact.SafeString, value string,
) (ProvisionedRateSpec, error) {
var spec ProvisionedRateSpec
used := make(map[string]struct{})
for _, split := range strings.Split(value, ":") {
if len(split) == 0 {
continue
}
subSplits := strings.Split(split, "=")
if len(subSplits) != 2 {
return ProvisionedRateSpec{}, errors.Errorf("%s field has invalid value %s", field, value)
}
subField := subSplits[0]
subValue := subSplits[1]
if _, ok := used[subField]; ok {
return ProvisionedRateSpec{}, errors.Errorf("%s field has duplicate sub-field %s",
field, subField)
}
used[subField] = struct{}{}
if len(subField) == 0 {
continue
}
if len(subValue) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field has no value specified for sub-field %s", field, subField)
}
switch subField {
case "disk-name":
spec.DiskName = subValue
case "bandwidth":
if len(subValue) <= 2 || subValue[len(subValue)-2:] != "/s" {
return ProvisionedRateSpec{},
errors.Errorf("%s field does not have bandwidth sub-field %s ending in /s",
field, subValue)
}
subValue = subValue[:len(subValue)-2]
var err error
spec.ProvisionedBandwidth, err = humanizeutil.ParseBytes(subValue)
if err != nil {
return ProvisionedRateSpec{},
errors.Wrapf(err, "could not parse bandwidth in field %s", field)
}
if spec.ProvisionedBandwidth == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field is trying to set bandwidth to 0", field)
}
default:
return ProvisionedRateSpec{}, errors.Errorf("%s field has unknown sub-field %s",
field, subField)
}
}
if len(spec.DiskName) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field did not specify disk-name", field)
}
return spec, nil
}

// StoreSpec contains the details that can be specified in the cli pertaining
// to the --store flag.
type StoreSpec struct {
Expand Down Expand Up @@ -189,6 +269,8 @@ type StoreSpec struct {
// through to C CCL code to set up encryption-at-rest. Must be set if and
// only if encryption is enabled, otherwise left empty.
EncryptionOptions []byte
// ProvisionedRateSpec is optional.
ProvisionedRateSpec ProvisionedRateSpec
}

// String returns a fully parsable version of the store spec.
Expand Down Expand Up @@ -231,6 +313,16 @@ func (ss StoreSpec) String() string {
fmt.Fprint(&buffer, optsStr)
fmt.Fprint(&buffer, ",")
}
if len(ss.ProvisionedRateSpec.DiskName) > 0 {
fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s",
ss.ProvisionedRateSpec.DiskName)
if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
fmt.Fprintf(&buffer, ":bandwidth=%s/s,",
humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth))
} else {
fmt.Fprintf(&buffer, ",")
}
}
// Trim the extra comma from the end if it exists.
if l := buffer.Len(); l > 0 {
buffer.Truncate(l - 1)
Expand Down Expand Up @@ -259,7 +351,7 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-

// NewStoreSpec parses the string passed into a --store flag and returns a
// StoreSpec if it is correctly parsed.
// There are four possible fields that can be passed in, comma separated:
// There are five possible fields that can be passed in, comma separated:
// - path=xxx The directory in which to the rocks db instance should be
// located, required unless using a in memory storage.
// - type=mem This specifies that the store is an in memory storage instead of
Expand All @@ -273,6 +365,10 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// - 20% -> 20% of the available space
// - 0.2 -> 20% of the available space
// - attrs=xxx:yyy:zzz A colon separated list of optional attributes.
// - provisioned-rate=disk-name=<disk-name>[:bandwidth=<bandwidth-bytes/s>] The
// provisioned-rate can be used for admission control for operations on the
// store. The bandwidth is optional, and if unspecified, a cluster setting
// (kv.store.admission.provisioned_bandwidth) will be used.
// Note that commas are forbidden within any field name or value.
func NewStoreSpec(value string) (StoreSpec, error) {
const pathField = "path"
Expand Down Expand Up @@ -399,6 +495,13 @@ func NewStoreSpec(value string) (StoreSpec, error) {
return StoreSpec{}, err
}
ss.PebbleOptions = buf.String()
case "provisioned-rate":
rateSpec, err := newStoreProvisionedRateSpec("provisioned-rate", value)
if err != nil {
return StoreSpec{}, err
}
ss.ProvisionedRateSpec = rateSpec

default:
return StoreSpec{}, fmt.Errorf("%s is not a valid store field", field)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/base/store_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ target_file_size=2097152`
{"path=/mnt/hda1,type=other", "other is not a valid store type", StoreSpec{}},
{"path=/mnt/hda1,type=mem,size=20GiB", "path specified for in memory store", StoreSpec{}},

// provisioned rate
{"path=/mnt/hda1,provisioned-rate=disk-name=nvme1n1:bandwidth=200MiB/s", "",
StoreSpec{Path: "/mnt/hda1", ProvisionedRateSpec: base.ProvisionedRateSpec{
DiskName: "nvme1n1", ProvisionedBandwidth: 200 << 20}}},
{"path=/mnt/hda1,provisioned-rate=disk-name=sdb", "", StoreSpec{
Path: "/mnt/hda1", ProvisionedRateSpec: base.ProvisionedRateSpec{
DiskName: "sdb", ProvisionedBandwidth: 0}}},

// RocksDB
{"path=/,rocksdb=key1=val1;key2=val2", "", StoreSpec{Path: "/", RocksDBOptions: "key1=val1;key2=val2"}},

Expand Down
11 changes: 11 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,17 @@ memory that the store may consume, for example:
--store=type=mem,size=20GiB
--store=type=mem,size=90%
</PRE>
Optionally, to configure admission control enforcement to prevent disk
bandwidth saturation, the "provisioned-rate" field can be specified with
the "disk-name" and an optional "bandwidth". The bandwidth is used to override
the value of the cluster setting, kv.store.admission.provisioned_bandwidth.
For example:
<PRE>
--store=provisioned-rate=disk-name=nvme1n1
--store=provisioned-rate=disk-name=sdb:bandwidth=250MiB/s
</PRE>
Commas are forbidden in all values, since they are used to separate fields.
Also, if you use equal signs in the file path to a store, you must use the
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3964,3 +3964,11 @@ func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes(
storeAdmissionQ.BypassedWorkDone(
followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo)
}

// ProvisionedBandwidthForAdmissionControl set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidthForAdmissionControl = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kv.store.admission.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ go_test(
"//pkg/server/diagnostics",
"//pkg/server/diagnostics/diagnosticspb",
"//pkg/server/serverpb",
"//pkg/server/status",
"//pkg/server/status/statuspb",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down Expand Up @@ -446,6 +447,7 @@ go_test(
"//pkg/upgrade",
"//pkg/upgrade/upgrades",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/envutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
Expand Down
83 changes: 82 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ type Node struct {
// COCKROACH_DEBUG_TS_IMPORT_FILE env var.
suppressNodeStatus syncutil.AtomicBool

diskStatsMap diskStatsMap

testingErrorEvent func(context.Context, *roachpb.BatchRequest, error)
}

Expand Down Expand Up @@ -772,17 +774,96 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh
s.UpdateIOThreshold(threshold)
}

// diskStatsMap encapsulates all the logic for populating DiskStats for
// admission.StoreMetrics.
type diskStatsMap struct {
provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
diskNameToStoreID map[string]roachpb.StoreID
}

func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
ctx context.Context,
clusterProvisionedBandwidth int64,
diskStatsFunc func(context.Context) ([]status.DiskStats, error),
) (stats map[roachpb.StoreID]admission.DiskStats, err error) {
if dsm.empty() {
return stats, nil
}
diskStats, err := diskStatsFunc(ctx)
if err != nil {
return stats, err
}
stats = make(map[roachpb.StoreID]admission.DiskStats)
for id, spec := range dsm.provisionedRate {
s := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if spec.ProvisionedBandwidth > 0 {
s.ProvisionedBandwidth = spec.ProvisionedBandwidth
}
stats[id] = s
}
for i := range diskStats {
if id, ok := dsm.diskNameToStoreID[diskStats[i].Name]; ok {
s := stats[id]
s.BytesRead = uint64(diskStats[i].ReadBytes)
s.BytesWritten = uint64(diskStats[i].WriteBytes)
stats[id] = s
}
}
return stats, nil
}

func (dsm *diskStatsMap) empty() bool {
return len(dsm.provisionedRate) == 0
}

func (dsm *diskStatsMap) initDiskStatsMap(specs []base.StoreSpec, engines []storage.Engine) error {
*dsm = diskStatsMap{
provisionedRate: make(map[roachpb.StoreID]base.ProvisionedRateSpec),
diskNameToStoreID: make(map[string]roachpb.StoreID),
}
for i := range engines {
id, err := kvserver.ReadStoreIdent(context.Background(), engines[i])
if err != nil {
return err
}
if len(specs[i].ProvisionedRateSpec.DiskName) > 0 {
dsm.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
dsm.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID
}
}
return nil
}

func (n *Node) registerEnginesForDiskStatsMap(
specs []base.StoreSpec, engines []storage.Engine,
) error {
return n.diskStatsMap.initDiskStatsMap(specs, engines)
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get(
&n.storeCfg.Settings.SV)
storeIDToDiskStats, err := n.diskStatsMap.tryPopulateAdmissionDiskStats(
context.Background(), clusterProvisionedBandwidth, status.GetDiskCounters)
if err != nil {
log.Warningf(context.Background(), "%v",
errors.Wrapf(err, "unable to populate disk stats"))
}
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
im := store.Engine().GetInternalIntervalMetrics()
diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if s, ok := storeIDToDiskStats[store.StoreID()]; ok {
diskStats = s
}
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im})
InternalIntervalMetrics: im,
DiskStats: diskStats})
return nil
})
return metrics
Expand Down
Loading

0 comments on commit 833a031

Please sign in to comment.