Skip to content

Commit

Permalink
admission: add l0 control metrics
Browse files Browse the repository at this point in the history
Part of cockroachdb#82743.

We introduce metrics for l0 compacted bytes, generated l0 tokens, and l0
tokens returned.

Release note: None
  • Loading branch information
irfansharif committed Sep 5, 2023
1 parent be7105f commit 05f0064
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 80 deletions.
94 changes: 52 additions & 42 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ func (gcs GrantCoordinators) Close() {
type StoreGrantCoordinators struct {
ambientCtx log.AmbientContext

settings *cluster.Settings
makeStoreRequesterFunc makeStoreRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTokensTookWithoutPermission *metric.Counter
kvIOTotalTokensTaken *metric.Counter
settings *cluster.Settings
makeStoreRequesterFunc makeStoreRequesterFunc
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTotalTokensTaken *metric.Counter
kvIOTotalTokensReturned *metric.Counter
l0CompactedBytes *metric.Counter
l0TokensProduced *metric.Counter

// These metrics are shared by WorkQueues across stores.
workQueueMetrics *WorkQueueMetrics
Expand Down Expand Up @@ -168,10 +170,10 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
// initialization, which will also set these to unlimited.
startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(),
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
availableTokensMetrics: sgc.kvIOTokensAvailable,
availableTokensMetric: sgc.kvIOTokensAvailable,
availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable,
tookWithoutPermissionMetric: sgc.kvIOTokensTookWithoutPermission,
totalTokensTaken: sgc.kvIOTotalTokensTaken,
tokensTakenMetric: sgc.kvIOTotalTokensTaken,
tokensReturnedMetric: sgc.kvIOTotalTokensReturned,
}
kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens
Expand Down Expand Up @@ -215,6 +217,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
perWorkTokenEstimator: makeStorePerWorkTokenEstimator(),
diskBandwidthLimiter: makeDiskBandwidthLimiter(),
kvGranter: kvg,
l0CompactedBytes: sgc.l0CompactedBytes,
l0TokensProduced: sgc.l0TokensProduced,
}
return coord
}
Expand Down Expand Up @@ -462,17 +466,19 @@ func makeStoresGrantCoordinators(
makeStoreRequester = opts.makeStoreRequesterFunc
}
storeCoordinators := &StoreGrantCoordinators{
ambientCtx: ambientCtx,
settings: st,
makeStoreRequesterFunc: makeStoreRequester,
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
workQueueMetrics: storeWorkQueueMetrics,
onLogEntryAdmitted: onLogEntryAdmitted,
knobs: knobs,
ambientCtx: ambientCtx,
settings: st,
makeStoreRequesterFunc: makeStoreRequester,
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: storeWorkQueueMetrics,
onLogEntryAdmitted: onLogEntryAdmitted,
knobs: knobs,
}
return storeCoordinators
}
Expand Down Expand Up @@ -1012,34 +1018,38 @@ type GrantCoordinatorMetrics struct {
KVSlotAdjusterIncrements *metric.Counter
KVSlotAdjusterDecrements *metric.Counter
// TODO(banabrick): Make these metrics per store.
KVIOTokensExhaustedDuration *metric.Counter
KVIOTokensTookWithoutPermission *metric.Counter
KVIOTotalTokensTaken *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
KVIOTokensExhaustedDuration *metric.Counter
KVIOTotalTokensTaken *metric.Counter
KVIOTotalTokensReturned *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
L0CompactedBytes *metric.Counter
L0TokensProduced *metric.Counter
SQLLeafStartUsedSlots *metric.Gauge
SQLRootStartUsedSlots *metric.Gauge
}

// MetricStruct implements the metric.Struct interface.
func (GrantCoordinatorMetrics) MetricStruct() {}

func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
m := GrantCoordinatorMetrics{
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)),
KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration),
KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration),
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
KVIOTokensTookWithoutPermission: metric.NewCounter(kvIONumIOTokensTookWithoutPermission),
KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
KVTotalSlots: metric.NewGauge(totalSlots),
KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)),
KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration),
KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration),
KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration),
KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements),
KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements),
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken),
KVIOTotalTokensReturned: metric.NewCounter(kvIOTotalTokensReturned),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
L0CompactedBytes: metric.NewCounter(l0CompactedBytes),
L0TokensProduced: metric.NewCounter(l0TokensProduced),
}
return m
}
Expand Down
67 changes: 41 additions & 26 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,12 @@ type kvStoreTokenGranter struct {
// computing startingIOTokens-availableIOTokens.
startingIOTokens int64
ioTokensExhaustedDurationMetric *metric.Counter
availableTokensMetrics *metric.Gauge
availableTokensMetric *metric.Gauge
availableElasticTokensMetric *metric.Gauge
tookWithoutPermissionMetric *metric.Counter
totalTokensTaken *metric.Counter
exhaustedStart time.Time
tokensReturnedMetric *metric.Counter
tokensTakenMetric *metric.Counter

exhaustedStart time.Time

// Estimation models.
l0WriteLM, l0IngestLM, ingestLM tokensLinearModel
Expand Down Expand Up @@ -404,7 +405,6 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
if sg.coordMu.availableIOTokens > 0 {
sg.subtractTokensLocked(count, count, false)
sg.coordMu.diskBWTokensUsed[wc] += count
sg.totalTokensTaken.Inc(count)
return grantSuccess
}
case admissionpb.ElasticWorkClass:
Expand All @@ -414,7 +414,6 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant
sg.subtractTokensLocked(count, count, false)
sg.coordMu.elasticIOTokensUsedByElastic += count
sg.coordMu.diskBWTokensUsed[wc] += count
sg.totalTokensTaken.Inc(count)
return grantSuccess
}
}
Expand Down Expand Up @@ -446,8 +445,6 @@ func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkC
func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHandle int8) {
wc := admissionpb.WorkClass(demuxHandle)
sg.subtractTokensLocked(count, count, false)
sg.tookWithoutPermissionMetric.Inc(count)
sg.totalTokensTaken.Inc(count)
if wc == admissionpb.ElasticWorkClass {
sg.coordMu.elasticDiskBWTokensAvailable -= count
sg.coordMu.elasticIOTokensUsedByElastic += count
Expand All @@ -458,28 +455,35 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan
// subtractTokensLocked is a helper function that subtracts count tokens (count
// can be negative, in which case this is really an addition).
func (sg *kvStoreTokenGranter) subtractTokensLocked(
count int64, elasticCount int64, forceTickMetric bool,
count int64, elasticCount int64, settingAvailableTokens bool,
) {
avail := sg.coordMu.availableIOTokens
sg.coordMu.availableIOTokens -= count
sg.coordMu.availableElasticIOTokens -= elasticCount
sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
if !settingAvailableTokens {
if count > 0 {
sg.tokensTakenMetric.Inc(count)
} else {
sg.tokensReturnedMetric.Inc(-count)
}
}
if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 {
// Transition from > 0 to <= 0.
sg.exhaustedStart = timeutil.Now()
} else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || forceTickMetric) {
// Transition from <= 0 to > 0, or forced to tick the metric. The latter
// ensures that if the available tokens stay <= 0, we don't show a sudden
// change in the metric after minutes of exhaustion (we had observed such
// behavior prior to this change).
} else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || settingAvailableTokens) {
// Transition from <= 0 to > 0, or if we're newly setting available
// tokens. The latter ensures that if the available tokens stay <= 0, we
// don't show a sudden change in the metric after minutes of exhaustion
// (we had observed such behavior prior to this change).
now := timeutil.Now()
exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds()
sg.ioTokensExhaustedDurationMetric.Inc(exhaustedMicros)
if sg.coordMu.availableIOTokens <= 0 {
sg.exhaustedStart = now
}
}
sg.availableTokensMetrics.Update(sg.coordMu.availableIOTokens)
sg.coordMu.availableElasticIOTokens -= elasticCount
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
}

// requesterHasWaitingRequests implements granterWithLockedCalls.
Expand Down Expand Up @@ -570,10 +574,9 @@ func (sg *kvStoreTokenGranter) setAvailableTokens(
sg.coordMu.availableElasticIOTokens =
min(sg.coordMu.availableElasticIOTokens, sg.coordMu.availableIOTokens)
}

sg.startingIOTokens = sg.coordMu.availableIOTokens
sg.availableTokensMetrics.Update(sg.coordMu.availableIOTokens)
sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens)
sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens)
sg.startingIOTokens = sg.coordMu.availableIOTokens

sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens
if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity {
Expand Down Expand Up @@ -736,18 +739,18 @@ var (
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvIONumIOTokensTookWithoutPermission = metric.Metadata{
Name: "admission.granter.io_tokens_took_without_permission.kv",
Help: "Total number of tokens taken without permission",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTotalTokensTaken = metric.Metadata{
Name: "admission.granter.io_tokens_taken.kv",
Help: "Total number of tokens taken",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTotalTokensReturned = metric.Metadata{
Name: "admission.granter.io_tokens_returned.kv",
Help: "Total number of tokens returned",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTokensAvailable = metric.Metadata{
Name: "admission.granter.io_tokens_available.kv",
Help: "Number of tokens available",
Expand All @@ -760,6 +763,18 @@ var (
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
l0CompactedBytes = metric.Metadata{
Name: "admission.l0_compacted_bytes.kv",
Help: "Total bytes compacted out of L0 (used to generate IO tokens)",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
l0TokensProduced = metric.Metadata{
Name: "admission.l0_tokens_produced.kv",
Help: "Total bytes produced for L0 writes",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
)

// TODO(irfansharif): we are lacking metrics for IO tokens and load, including
Expand Down
18 changes: 10 additions & 8 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,16 @@ func TestGranterBasic(t *testing.T) {
requesters[numWorkKinds] = req.requesters[admissionpb.ElasticWorkClass]
return req
},
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
workQueueMetrics: workQueueMetrics,
disableTickerForTesting: true,
knobs: &TestingKnobs{},
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: workQueueMetrics,
disableTickerForTesting: true,
knobs: &TestingKnobs{},
}
var metricsProvider testMetricsProvider
metricsProvider.setMetricsForStores([]int32{1}, pebble.Metrics{})
Expand Down
15 changes: 14 additions & 1 deletion pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -188,6 +189,9 @@ type ioLoadListener struct {
adjustTokensResult
perWorkTokenEstimator storePerWorkTokenEstimator
diskBandwidthLimiter diskBandwidthLimiter

l0CompactedBytes *metric.Counter
l0TokensProduced *metric.Counter
}

type ioLoadListenerState struct {
Expand Down Expand Up @@ -641,7 +645,7 @@ type adjustTokensAuxComputations struct {

// adjustTokensInner is used for computing tokens based on compaction and
// flush bottlenecks.
func (*ioLoadListener) adjustTokensInner(
func (io *ioLoadListener) adjustTokensInner(
ctx context.Context,
prev ioLoadListenerState,
l0Metrics pebble.LevelMetrics,
Expand Down Expand Up @@ -677,7 +681,10 @@ func (*ioLoadListener) adjustTokensInner(
// bytes (gauge).
intL0CompactedBytes = 0
}
io.l0CompactedBytes.Inc(intL0CompactedBytes)

const alpha = 0.5

// Compaction scheduling can be uneven in prioritizing L0 for compactions,
// so smooth out what is being removed by compactions.
smoothedIntL0CompactedBytes := int64(alpha*float64(intL0CompactedBytes) + (1-alpha)*float64(prev.smoothedIntL0CompactedBytes))
Expand Down Expand Up @@ -958,6 +965,9 @@ func (*ioLoadListener) adjustTokensInner(
if totalNumElasticByteTokens > totalNumByteTokens {
totalNumElasticByteTokens = totalNumByteTokens
}

io.l0TokensProduced.Inc(totalNumByteTokens)

// Install the latest cumulative stats.
return adjustTokensResult{
ioLoadListenerState: ioLoadListenerState{
Expand Down Expand Up @@ -1047,6 +1057,9 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
ib(m/adjustmentInterval))
switch res.aux.tokenKind {
case compactionTokenKind:
// NB: res.smoothedCompactionByteTokens is the same as
// res.ioLoadListenerState.totalNumByteTokens (printed above) when
// res.aux.tokenKind == compactionTokenKind.
p.Printf(" due to L0 growth")
case flushTokenKind:
p.Printf(" due to memtable flush (multiplier %.3f)", res.flushUtilTargetFraction)
Expand Down
Loading

0 comments on commit 05f0064

Please sign in to comment.