diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index afa7dd4a2f33..ba8b25e61961 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -49,13 +49,15 @@ func (gcs GrantCoordinators) Close() { type StoreGrantCoordinators struct { ambientCtx log.AmbientContext - settings *cluster.Settings - makeStoreRequesterFunc makeStoreRequesterFunc - kvIOTokensExhaustedDuration *metric.Counter - kvIOTokensAvailable *metric.Gauge - kvElasticIOTokensAvailable *metric.Gauge - kvIOTokensTookWithoutPermission *metric.Counter - kvIOTotalTokensTaken *metric.Counter + settings *cluster.Settings + makeStoreRequesterFunc makeStoreRequesterFunc + kvIOTokensExhaustedDuration *metric.Counter + kvIOTokensAvailable *metric.Gauge + kvElasticIOTokensAvailable *metric.Gauge + kvIOTotalTokensTaken *metric.Counter + kvIOTotalTokensReturned *metric.Counter + l0CompactedBytes *metric.Counter + l0TokensProduced *metric.Counter // These metrics are shared by WorkQueues across stores. workQueueMetrics *WorkQueueMetrics @@ -168,10 +170,10 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) // initialization, which will also set these to unlimited. startingIOTokens: unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval(), ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration, - availableTokensMetrics: sgc.kvIOTokensAvailable, + availableTokensMetric: sgc.kvIOTokensAvailable, availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable, - tookWithoutPermissionMetric: sgc.kvIOTokensTookWithoutPermission, - totalTokensTaken: sgc.kvIOTotalTokensTaken, + tokensTakenMetric: sgc.kvIOTotalTokensTaken, + tokensReturnedMetric: sgc.kvIOTotalTokensReturned, } kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval() kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens @@ -215,6 +217,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) perWorkTokenEstimator: makeStorePerWorkTokenEstimator(), diskBandwidthLimiter: makeDiskBandwidthLimiter(), kvGranter: kvg, + l0CompactedBytes: sgc.l0CompactedBytes, + l0TokensProduced: sgc.l0TokensProduced, } return coord } @@ -462,17 +466,19 @@ func makeStoresGrantCoordinators( makeStoreRequester = opts.makeStoreRequesterFunc } storeCoordinators := &StoreGrantCoordinators{ - ambientCtx: ambientCtx, - settings: st, - makeStoreRequesterFunc: makeStoreRequester, - kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, - kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission, - kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken, - kvIOTokensAvailable: metrics.KVIOTokensAvailable, - kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, - workQueueMetrics: storeWorkQueueMetrics, - onLogEntryAdmitted: onLogEntryAdmitted, - knobs: knobs, + ambientCtx: ambientCtx, + settings: st, + makeStoreRequesterFunc: makeStoreRequester, + kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, + kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken, + kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned, + kvIOTokensAvailable: metrics.KVIOTokensAvailable, + kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, + l0CompactedBytes: metrics.L0CompactedBytes, + l0TokensProduced: metrics.L0TokensProduced, + workQueueMetrics: storeWorkQueueMetrics, + onLogEntryAdmitted: onLogEntryAdmitted, + knobs: knobs, } return storeCoordinators } @@ -1012,13 +1018,15 @@ type GrantCoordinatorMetrics struct { KVSlotAdjusterIncrements *metric.Counter KVSlotAdjusterDecrements *metric.Counter // TODO(banabrick): Make these metrics per store. - KVIOTokensExhaustedDuration *metric.Counter - KVIOTokensTookWithoutPermission *metric.Counter - KVIOTotalTokensTaken *metric.Counter - KVIOTokensAvailable *metric.Gauge - KVElasticIOTokensAvailable *metric.Gauge - SQLLeafStartUsedSlots *metric.Gauge - SQLRootStartUsedSlots *metric.Gauge + KVIOTokensExhaustedDuration *metric.Counter + KVIOTotalTokensTaken *metric.Counter + KVIOTotalTokensReturned *metric.Counter + KVIOTokensAvailable *metric.Gauge + KVElasticIOTokensAvailable *metric.Gauge + L0CompactedBytes *metric.Counter + L0TokensProduced *metric.Counter + SQLLeafStartUsedSlots *metric.Gauge + SQLRootStartUsedSlots *metric.Gauge } // MetricStruct implements the metric.Struct interface. @@ -1026,20 +1034,22 @@ func (GrantCoordinatorMetrics) MetricStruct() {} func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics { m := GrantCoordinatorMetrics{ - KVTotalSlots: metric.NewGauge(totalSlots), - KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)), - KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration), - KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration), - KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration), - KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements), - KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements), - KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration), - SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)), - SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)), - KVIOTokensTookWithoutPermission: metric.NewCounter(kvIONumIOTokensTookWithoutPermission), - KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken), - KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable), - KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable), + KVTotalSlots: metric.NewGauge(totalSlots), + KVUsedSlots: metric.NewGauge(addName(workKindString(KVWork), usedSlots)), + KVSlotsExhaustedDuration: metric.NewCounter(kvSlotsExhaustedDuration), + KVCPULoadShortPeriodDuration: metric.NewCounter(kvCPULoadShortPeriodDuration), + KVCPULoadLongPeriodDuration: metric.NewCounter(kvCPULoadLongPeriodDuration), + KVSlotAdjusterIncrements: metric.NewCounter(kvSlotAdjusterIncrements), + KVSlotAdjusterDecrements: metric.NewCounter(kvSlotAdjusterDecrements), + KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration), + SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)), + SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)), + KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken), + KVIOTotalTokensReturned: metric.NewCounter(kvIOTotalTokensReturned), + KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable), + KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable), + L0CompactedBytes: metric.NewCounter(l0CompactedBytes), + L0TokensProduced: metric.NewCounter(l0TokensProduced), } return m } diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index efb7471ec064..63bb6401d3ea 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -315,11 +315,12 @@ type kvStoreTokenGranter struct { // computing startingIOTokens-availableIOTokens. startingIOTokens int64 ioTokensExhaustedDurationMetric *metric.Counter - availableTokensMetrics *metric.Gauge + availableTokensMetric *metric.Gauge availableElasticTokensMetric *metric.Gauge - tookWithoutPermissionMetric *metric.Counter - totalTokensTaken *metric.Counter - exhaustedStart time.Time + tokensReturnedMetric *metric.Counter + tokensTakenMetric *metric.Counter + + exhaustedStart time.Time // Estimation models. l0WriteLM, l0IngestLM, ingestLM tokensLinearModel @@ -404,7 +405,6 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant if sg.coordMu.availableIOTokens > 0 { sg.subtractTokensLocked(count, count, false) sg.coordMu.diskBWTokensUsed[wc] += count - sg.totalTokensTaken.Inc(count) return grantSuccess } case admissionpb.ElasticWorkClass: @@ -414,7 +414,6 @@ func (sg *kvStoreTokenGranter) tryGetLocked(count int64, demuxHandle int8) grant sg.subtractTokensLocked(count, count, false) sg.coordMu.elasticIOTokensUsedByElastic += count sg.coordMu.diskBWTokensUsed[wc] += count - sg.totalTokensTaken.Inc(count) return grantSuccess } } @@ -446,8 +445,6 @@ func (sg *kvStoreTokenGranter) tookWithoutPermission(workClass admissionpb.WorkC func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHandle int8) { wc := admissionpb.WorkClass(demuxHandle) sg.subtractTokensLocked(count, count, false) - sg.tookWithoutPermissionMetric.Inc(count) - sg.totalTokensTaken.Inc(count) if wc == admissionpb.ElasticWorkClass { sg.coordMu.elasticDiskBWTokensAvailable -= count sg.coordMu.elasticIOTokensUsedByElastic += count @@ -458,18 +455,28 @@ func (sg *kvStoreTokenGranter) tookWithoutPermissionLocked(count int64, demuxHan // subtractTokensLocked is a helper function that subtracts count tokens (count // can be negative, in which case this is really an addition). func (sg *kvStoreTokenGranter) subtractTokensLocked( - count int64, elasticCount int64, forceTickMetric bool, + count int64, elasticCount int64, settingAvailableTokens bool, ) { avail := sg.coordMu.availableIOTokens sg.coordMu.availableIOTokens -= count + sg.coordMu.availableElasticIOTokens -= elasticCount + sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens) + sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens) + if !settingAvailableTokens { + if count > 0 { + sg.tokensTakenMetric.Inc(count) + } else { + sg.tokensReturnedMetric.Inc(-count) + } + } if count > 0 && avail > 0 && sg.coordMu.availableIOTokens <= 0 { // Transition from > 0 to <= 0. sg.exhaustedStart = timeutil.Now() - } else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || forceTickMetric) { - // Transition from <= 0 to > 0, or forced to tick the metric. The latter - // ensures that if the available tokens stay <= 0, we don't show a sudden - // change in the metric after minutes of exhaustion (we had observed such - // behavior prior to this change). + } else if count < 0 && avail <= 0 && (sg.coordMu.availableIOTokens > 0 || settingAvailableTokens) { + // Transition from <= 0 to > 0, or if we're newly setting available + // tokens. The latter ensures that if the available tokens stay <= 0, we + // don't show a sudden change in the metric after minutes of exhaustion + // (we had observed such behavior prior to this change). now := timeutil.Now() exhaustedMicros := now.Sub(sg.exhaustedStart).Microseconds() sg.ioTokensExhaustedDurationMetric.Inc(exhaustedMicros) @@ -477,9 +484,6 @@ func (sg *kvStoreTokenGranter) subtractTokensLocked( sg.exhaustedStart = now } } - sg.availableTokensMetrics.Update(sg.coordMu.availableIOTokens) - sg.coordMu.availableElasticIOTokens -= elasticCount - sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens) } // requesterHasWaitingRequests implements granterWithLockedCalls. @@ -570,10 +574,9 @@ func (sg *kvStoreTokenGranter) setAvailableTokens( sg.coordMu.availableElasticIOTokens = min(sg.coordMu.availableElasticIOTokens, sg.coordMu.availableIOTokens) } - - sg.startingIOTokens = sg.coordMu.availableIOTokens - sg.availableTokensMetrics.Update(sg.coordMu.availableIOTokens) + sg.availableTokensMetric.Update(sg.coordMu.availableIOTokens) sg.availableElasticTokensMetric.Update(sg.coordMu.availableElasticIOTokens) + sg.startingIOTokens = sg.coordMu.availableIOTokens sg.coordMu.elasticDiskBWTokensAvailable += elasticDiskBandwidthTokens if sg.coordMu.elasticDiskBWTokensAvailable > elasticDiskBandwidthTokensCapacity { @@ -736,18 +739,18 @@ var ( Measurement: "Microseconds", Unit: metric.Unit_COUNT, } - kvIONumIOTokensTookWithoutPermission = metric.Metadata{ - Name: "admission.granter.io_tokens_took_without_permission.kv", - Help: "Total number of tokens taken without permission", - Measurement: "Tokens", - Unit: metric.Unit_COUNT, - } kvIOTotalTokensTaken = metric.Metadata{ Name: "admission.granter.io_tokens_taken.kv", Help: "Total number of tokens taken", Measurement: "Tokens", Unit: metric.Unit_COUNT, } + kvIOTotalTokensReturned = metric.Metadata{ + Name: "admission.granter.io_tokens_returned.kv", + Help: "Total number of tokens returned", + Measurement: "Tokens", + Unit: metric.Unit_COUNT, + } kvIOTokensAvailable = metric.Metadata{ Name: "admission.granter.io_tokens_available.kv", Help: "Number of tokens available", @@ -760,6 +763,18 @@ var ( Measurement: "Tokens", Unit: metric.Unit_COUNT, } + l0CompactedBytes = metric.Metadata{ + Name: "admission.l0_compacted_bytes.kv", + Help: "Total bytes compacted out of L0 (used to generate IO tokens)", + Measurement: "Tokens", + Unit: metric.Unit_COUNT, + } + l0TokensProduced = metric.Metadata{ + Name: "admission.l0_tokens_produced.kv", + Help: "Total bytes produced for L0 writes", + Measurement: "Tokens", + Unit: metric.Unit_COUNT, + } ) // TODO(irfansharif): we are lacking metrics for IO tokens and load, including diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 83700a801aff..7c0bc610d877 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -137,14 +137,16 @@ func TestGranterBasic(t *testing.T) { requesters[numWorkKinds] = req.requesters[admissionpb.ElasticWorkClass] return req }, - kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, - kvIOTokensAvailable: metrics.KVIOTokensAvailable, - kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, - kvIOTokensTookWithoutPermission: metrics.KVIOTokensTookWithoutPermission, - kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken, - workQueueMetrics: workQueueMetrics, - disableTickerForTesting: true, - knobs: &TestingKnobs{}, + kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration, + kvIOTokensAvailable: metrics.KVIOTokensAvailable, + kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable, + kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken, + kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned, + l0CompactedBytes: metrics.L0CompactedBytes, + l0TokensProduced: metrics.L0TokensProduced, + workQueueMetrics: workQueueMetrics, + disableTickerForTesting: true, + knobs: &TestingKnobs{}, } var metricsProvider testMetricsProvider metricsProvider.setMetricsForStores([]int32{1}, pebble.Metrics{}) diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 5b29ba618bbf..74c399141baa 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" @@ -188,6 +189,9 @@ type ioLoadListener struct { adjustTokensResult perWorkTokenEstimator storePerWorkTokenEstimator diskBandwidthLimiter diskBandwidthLimiter + + l0CompactedBytes *metric.Counter + l0TokensProduced *metric.Counter } type ioLoadListenerState struct { @@ -641,7 +645,7 @@ type adjustTokensAuxComputations struct { // adjustTokensInner is used for computing tokens based on compaction and // flush bottlenecks. -func (*ioLoadListener) adjustTokensInner( +func (io *ioLoadListener) adjustTokensInner( ctx context.Context, prev ioLoadListenerState, l0Metrics pebble.LevelMetrics, @@ -677,7 +681,10 @@ func (*ioLoadListener) adjustTokensInner( // bytes (gauge). intL0CompactedBytes = 0 } + io.l0CompactedBytes.Inc(intL0CompactedBytes) + const alpha = 0.5 + // Compaction scheduling can be uneven in prioritizing L0 for compactions, // so smooth out what is being removed by compactions. smoothedIntL0CompactedBytes := int64(alpha*float64(intL0CompactedBytes) + (1-alpha)*float64(prev.smoothedIntL0CompactedBytes)) @@ -958,6 +965,9 @@ func (*ioLoadListener) adjustTokensInner( if totalNumElasticByteTokens > totalNumByteTokens { totalNumElasticByteTokens = totalNumByteTokens } + + io.l0TokensProduced.Inc(totalNumByteTokens) + // Install the latest cumulative stats. return adjustTokensResult{ ioLoadListenerState: ioLoadListenerState{ @@ -1047,6 +1057,9 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) { ib(m/adjustmentInterval)) switch res.aux.tokenKind { case compactionTokenKind: + // NB: res.smoothedCompactionByteTokens is the same as + // res.ioLoadListenerState.totalNumByteTokens (printed above) when + // res.aux.tokenKind == compactionTokenKind. p.Printf(" due to L0 growth") case flushTokenKind: p.Printf(" due to memtable flush (multiplier %.3f)", res.flushUtilTargetFraction) diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 334a14709ebc..a6bcf38a84eb 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/echotest" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble" @@ -55,6 +56,8 @@ func TestIOLoadListener(t *testing.T) { kvRequester: req, perWorkTokenEstimator: makeStorePerWorkTokenEstimator(), diskBandwidthLimiter: makeDiskBandwidthLimiter(), + l0CompactedBytes: metric.NewCounter(l0CompactedBytes), + l0TokensProduced: metric.NewCounter(l0TokensProduced), } // The mutex is needed by ioLoadListener but is not useful in this // test -- the channels provide synchronization and prevent this @@ -214,8 +217,10 @@ func TestIOLoadListenerOverflow(t *testing.T) { ctx := context.Background() st := cluster.MakeTestingClusterSettings() ioll := ioLoadListener{ - settings: st, - kvRequester: req, + settings: st, + kvRequester: req, + l0CompactedBytes: metric.NewCounter(l0CompactedBytes), + l0TokensProduced: metric.NewCounter(l0TokensProduced), } ioll.kvGranter = kvGranter // Bug 1: overflow when totalNumByteTokens is too large. @@ -275,7 +280,12 @@ func TestAdjustTokensInnerAndLogging(t *testing.T) { var buf redact.StringBuilder for _, tt := range tests { buf.Printf("%s:\n", tt.name) - res := (*ioLoadListener)(nil).adjustTokensInner( + ioll := &ioLoadListener{ + settings: cluster.MakeTestingClusterSettings(), + l0CompactedBytes: metric.NewCounter(l0CompactedBytes), + l0TokensProduced: metric.NewCounter(l0TokensProduced), + } + res := ioll.adjustTokensInner( ctx, tt.prev, tt.l0Metrics, 12, pebble.ThroughputMetric{}, 100, 10, 0, 0.50) buf.Printf("%s\n", res) @@ -316,6 +326,8 @@ func TestBadIOLoadListenerStats(t *testing.T) { kvRequester: req, perWorkTokenEstimator: makeStorePerWorkTokenEstimator(), diskBandwidthLimiter: makeDiskBandwidthLimiter(), + l0CompactedBytes: metric.NewCounter(l0CompactedBytes), + l0TokensProduced: metric.NewCounter(l0TokensProduced), } ioll.kvGranter = kvGranter for i := 0; i < 100; i++ {