Skip to content

Commit

Permalink
base,kvserver,server: configuration of provisioned bandwidth for a store
Browse files Browse the repository at this point in the history
A previous PR cockroachdb#85722 added support for disk bandwidth as a bottlneck
resource, in the admission control package. To utilize this, admission
control needs to be provided the provisioned bandwidth and the observed
read and write bytes. This PR adds configuration support for this via
the StoreSpec (that uses the --store flag). The StoreSpec now has an
optional ProvisionedRateSpec that contains the name of the disk
corresponding to the store, and an optional provisioned bandwidth,
that are specified as
provisioned-rate=name=<disk-name>[:bandwidth=<bandwidth-bytes>].

The disk-name is used to map the DiskStats, retrieved via the existing
code in status.GetDiskCounters to the correct Store. These DiskStats
contain the read and write bytes. The optional bandwidth is used to
override the provisioned bandwidth set via the new cluster setting
kv.store.admission.provisioned_bandwidth.

Release note (ops change): Disk bandwidth constraint can now be
used to control admission of elastic writes. This requires configuration
for each store, via the --store flag, that now contains a
provisioned-rate field. The provisioned-rate field needs to provide
a disk-name for the store and optionally a disk bandwidth. If the
disk bandwidth is not provided the cluster setting
kv.store.admission.provisioned_bandwidth will be used. The cluster
setting defaults to 0. If the effective disk bandwidth, i.e., using
the possibly overridden cluster setting is 0, there is no disk
bandwidth constraint. Additionally, the admission control cluster
setting admission.disk_bandwidth_tokens.elastic.enabled (defaults
to true) can be used to turn off enforcement even when all the other
configuration has been setup. Turning off enforcement will still
output all the relevant information about disk bandwidth usage, so
can be used to observe part of the mechanism in action.

Fixes cockroachdb#82898
  • Loading branch information
sumeerbhola committed Aug 15, 2022
1 parent f1c2047 commit 09ad169
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 48 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<tr><td><code>kv.snapshot_delegation.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.store.admission.provisioned_bandwidth</code></td><td>integer</td><td><code>0</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.reject_over_max_intents_budget.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td></tr>
Expand Down
88 changes: 87 additions & 1 deletion pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,69 @@ func (ss *SizeSpec) Set(value string) error {
return nil
}

// ProvisionedRateSpec is an optional part of the StoreSpec.
type ProvisionedRateSpec struct {
// DiskName is the name of the disk observed by the code in disk_counters.go
// when retrieving stats for this store.
DiskName string
// ProvisionedBandwidth is the bandwidth provisioned for this store in
// bytes/s.
ProvisionedBandwidth int64
}

func newStoreProvisionedRateSpec(
field redact.SafeString, value string,
) (ProvisionedRateSpec, error) {
var spec ProvisionedRateSpec
used := make(map[string]struct{})
for _, split := range strings.Split(value, ":") {
if len(split) == 0 {
continue
}
subSplits := strings.Split(split, "=")
if len(subSplits) != 2 {
return ProvisionedRateSpec{}, errors.Errorf("%s field has invalid value %s", field, value)
}
subField := subSplits[0]
subValue := subSplits[1]
if _, ok := used[subField]; ok {
return ProvisionedRateSpec{}, errors.Errorf("%s field has duplicate sub-field %s",
field, subField)
}
used[subField] = struct{}{}
if len(subField) == 0 {
continue
}
if len(subValue) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field has no value specified for sub-field %s", field, subField)
}
switch subField {
case "disk-name":
spec.DiskName = subValue
case "bandwidth":
var err error
spec.ProvisionedBandwidth, err = humanizeutil.ParseBytes(value)
if err != nil {
return ProvisionedRateSpec{},
errors.Wrapf(err, "could not parse bandwidth in field %s", field)
}
if spec.ProvisionedBandwidth == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field is trying to set bandwidth to 0", field)
}
default:
return ProvisionedRateSpec{}, errors.Errorf("%s field has unknown sub-field %s",
field, subField)
}
}
if len(spec.DiskName) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field did not specify disk-name", field)
}
return spec, nil
}

// StoreSpec contains the details that can be specified in the cli pertaining
// to the --store flag.
type StoreSpec struct {
Expand Down Expand Up @@ -189,6 +252,8 @@ type StoreSpec struct {
// through to C CCL code to set up encryption-at-rest. Must be set if and
// only if encryption is enabled, otherwise left empty.
EncryptionOptions []byte
// ProvisionedRateSpec is optional.
ProvisionedRateSpec ProvisionedRateSpec
}

// String returns a fully parsable version of the store spec.
Expand Down Expand Up @@ -231,6 +296,16 @@ func (ss StoreSpec) String() string {
fmt.Fprint(&buffer, optsStr)
fmt.Fprint(&buffer, ",")
}
if len(ss.ProvisionedRateSpec.DiskName) > 0 {
fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s",
ss.ProvisionedRateSpec.DiskName)
if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
fmt.Fprintf(&buffer, ":bandwidth=%s,",
humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth))
} else {
fmt.Fprintf(&buffer, ",")
}
}
// Trim the extra comma from the end if it exists.
if l := buffer.Len(); l > 0 {
buffer.Truncate(l - 1)
Expand Down Expand Up @@ -259,7 +334,7 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-

// NewStoreSpec parses the string passed into a --store flag and returns a
// StoreSpec if it is correctly parsed.
// There are four possible fields that can be passed in, comma separated:
// There are five possible fields that can be passed in, comma separated:
// - path=xxx The directory in which to the rocks db instance should be
// located, required unless using a in memory storage.
// - type=mem This specifies that the store is an in memory storage instead of
Expand All @@ -273,6 +348,10 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// - 20% -> 20% of the available space
// - 0.2 -> 20% of the available space
// - attrs=xxx:yyy:zzz A colon separated list of optional attributes.
// - provisioned-rate=name=<disk-name>[:bandwidth=<bandwidth-bytes>] The
// provisioned-rate can be used for admission control for operations on the
// store. The bandwidth is optional, and if unspecified, a cluster setting
// (kv.store.admission.provisioned_bandwidth) will be used.
// Note that commas are forbidden within any field name or value.
func NewStoreSpec(value string) (StoreSpec, error) {
const pathField = "path"
Expand Down Expand Up @@ -399,6 +478,13 @@ func NewStoreSpec(value string) (StoreSpec, error) {
return StoreSpec{}, err
}
ss.PebbleOptions = buf.String()
case "provisioned-rate":
rateSpec, err := newStoreProvisionedRateSpec("provisioned-rate", value)
if err != nil {
return StoreSpec{}, err
}
ss.ProvisionedRateSpec = rateSpec

default:
return StoreSpec{}, fmt.Errorf("%s is not a valid store field", field)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3958,3 +3958,11 @@ func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes(
storeAdmissionQ.BypassedWorkDone(
followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo)
}

// ProvisionedBandwidthForAdmissionControl set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidthForAdmissionControl = settings.RegisterIntSetting(
settings.SystemOnly, "kv.store.admission.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0, settings.NonNegativeInt).WithPublic()
71 changes: 70 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ type Node struct {
// COCKROACH_DEBUG_TS_IMPORT_FILE env var.
suppressNodeStatus syncutil.AtomicBool

diskStatsMap diskStatsMap

testingErrorEvent func(context.Context, *roachpb.BatchRequest, error)
}

Expand Down Expand Up @@ -775,17 +777,84 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh
s.UpdateIOThreshold(threshold)
}

type diskStatsMap struct {
provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
diskNameToStoreID map[string]roachpb.StoreID
}

func (dsm *diskStatsMap) populateAdmissionDiskStats(
clusterProvisionedBandwidth int64,
diskStats []status.DiskStats,
stats map[roachpb.StoreID]admission.DiskStats,
) {
for id, spec := range dsm.provisionedRate {
s := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if spec.ProvisionedBandwidth > 0 {
s.ProvisionedBandwidth = spec.ProvisionedBandwidth
}
stats[id] = s
}
for i := range diskStats {
if id, ok := dsm.diskNameToStoreID[diskStats[i].Name]; ok {
s := stats[id]
s.BytesRead = uint64(diskStats[i].ReadBytes)
s.BytesWritten = uint64(diskStats[i].WriteBytes)
}
}
}

func (dsm *diskStatsMap) empty() bool {
return len(dsm.provisionedRate) == 0
}

func (n *Node) registerEnginesForDiskStatsMap(
specs []base.StoreSpec, engines []storage.Engine,
) error {
n.diskStatsMap.provisionedRate = make(map[roachpb.StoreID]base.ProvisionedRateSpec)
n.diskStatsMap.diskNameToStoreID = make(map[string]roachpb.StoreID)
for i := range engines {
id, err := kvserver.ReadStoreIdent(context.Background(), engines[i])
if err != nil {
return err
}
if len(specs[i].ProvisionedRateSpec.DiskName) > 0 {
n.diskStatsMap.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
n.diskStatsMap.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID
}
}
return nil
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get(
&n.storeCfg.Settings.SV)
var storeIDToDiskStats map[roachpb.StoreID]admission.DiskStats
if !n.diskStatsMap.empty() {
diskCounters, err := status.GetDiskCounters(context.Background())
if err != nil {
log.Warningf(context.Background(), "%v",
errors.Wrapf(err, "unable to get disk stats"))
} else {
storeIDToDiskStats = make(map[roachpb.StoreID]admission.DiskStats)
n.diskStatsMap.populateAdmissionDiskStats(
clusterProvisionedBandwidth, diskCounters, storeIDToDiskStats)
}
}
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
im := store.Engine().GetInternalIntervalMetrics()
diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if s, ok := storeIDToDiskStats[store.StoreID()]; ok {
diskStats = s
}
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im})
InternalIntervalMetrics: im,
DiskStats: diskStats})
return nil
})
return metrics
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,10 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}

if err := s.node.registerEnginesForDiskStatsMap(s.cfg.Stores.Specs, s.engines); err != nil {
return errors.Wrapf(err, "failed to register engines for the disk stats map")
}

if err := s.debug.RegisterEngines(s.cfg.Stores.Specs, s.engines); err != nil {
return errors.Wrapf(err, "failed to register engines with debug server")
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/server/status/disk_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ import (
"github.com/shirou/gopsutil/v3/disk"
)

func getDiskCounters(ctx context.Context) ([]diskStats, error) {
// GetDiskCounters returns DiskStats for all disks.
func GetDiskCounters(ctx context.Context) ([]DiskStats, error) {
driveStats, err := disk.IOCountersWithContext(ctx)
if err != nil {
return nil, err
}

output := make([]diskStats, len(driveStats))
output := make([]DiskStats, len(driveStats))
i := 0
for _, counters := range driveStats {
output[i] = diskStats{
readBytes: int64(counters.ReadBytes),
output[i] = DiskStats{
Name: counters.Name,
ReadBytes: int64(counters.ReadBytes),
readCount: int64(counters.ReadCount),
readTime: time.Duration(counters.ReadTime) * time.Millisecond,
writeBytes: int64(counters.WriteBytes),
WriteBytes: int64(counters.WriteBytes),
writeCount: int64(counters.WriteCount),
writeTime: time.Duration(counters.WriteTime) * time.Millisecond,
ioTime: time.Duration(counters.IoTime) * time.Millisecond,
Expand Down
12 changes: 7 additions & 5 deletions pkg/server/status/disk_counters_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ import (
"github.com/lufia/iostat"
)

func getDiskCounters(context.Context) ([]diskStats, error) {
// GetDiskCounters returns DiskStats for all disks.
func GetDiskCounters(context.Context) ([]DiskStats, error) {
driveStats, err := iostat.ReadDriveStats()
if err != nil {
return nil, err
}

output := make([]diskStats, len(driveStats))
output := make([]DiskStats, len(driveStats))
for i, counters := range driveStats {
output[i] = diskStats{
readBytes: counters.BytesRead,
output[i] = DiskStats{
Name: counters.Name,
ReadBytes: counters.BytesRead,
readCount: counters.NumRead,
readTime: counters.TotalReadTime,
writeBytes: counters.BytesWritten,
WriteBytes: counters.BytesWritten,
writeCount: counters.NumWrite,
writeTime: counters.TotalWriteTime,
ioTime: 0, // Not reported by this library.
Expand Down
Loading

0 comments on commit 09ad169

Please sign in to comment.