Skip to content

Commit

Permalink
admission: add metric for bypassed IO admission work
Browse files Browse the repository at this point in the history
Part of cockroachdb#82743. We introduce an admission.granter.io_tokens_bypassed.kv
metric, that tracks the total number of tokens taken by work bypassing
admission control. For example, follower writes without flow control.

Aside: cockroachdb#109640 ripped out a tokens-taken-without-permission metric that
was supposed to capture some of this, but even for standard admission
work we'd routinely exercise that code path. When admitting work, we
take 1 token, and later take the remaining without permission.

Release note: None
  • Loading branch information
irfansharif committed Sep 6, 2023
1 parent f75f543 commit 6dba9cc
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 20 deletions.
27 changes: 16 additions & 11 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ type StoreGrantCoordinators struct {
kvIOTokensExhaustedDuration *metric.Counter
kvIOTokensAvailable *metric.Gauge
kvElasticIOTokensAvailable *metric.Gauge
kvIOTotalTokensTaken *metric.Counter
kvIOTotalTokensReturned *metric.Counter
kvIOTokensTaken *metric.Counter
kvIOTokensReturned *metric.Counter
kvIOTokensBypassed *metric.Counter
l0CompactedBytes *metric.Counter
l0TokensProduced *metric.Counter

Expand Down Expand Up @@ -172,8 +173,8 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
ioTokensExhaustedDurationMetric: sgc.kvIOTokensExhaustedDuration,
availableTokensMetric: sgc.kvIOTokensAvailable,
availableElasticTokensMetric: sgc.kvElasticIOTokensAvailable,
tokensTakenMetric: sgc.kvIOTotalTokensTaken,
tokensReturnedMetric: sgc.kvIOTotalTokensReturned,
tokensTakenMetric: sgc.kvIOTokensTaken,
tokensReturnedMetric: sgc.kvIOTokensReturned,
}
kvg.coordMu.availableIOTokens = unlimitedTokens / unloadedDuration.ticksInAdjustmentInterval()
kvg.coordMu.availableElasticIOTokens = kvg.coordMu.availableIOTokens
Expand Down Expand Up @@ -203,6 +204,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID)
opts,
sgc.knobs,
sgc.onLogEntryAdmitted,
sgc.kvIOTokensBypassed,
&coord.mu.Mutex,
)
coord.queues[KVWork] = storeReq
Expand Down Expand Up @@ -380,7 +382,7 @@ type makeRequesterFunc func(
type makeStoreRequesterFunc func(
_ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs,
onLogEntryAdmitted OnLogEntryAdmitted, coordMu *syncutil.Mutex,
onLogEntryAdmitted OnLogEntryAdmitted, ioTokensBypassedMetric *metric.Counter, coordMu *syncutil.Mutex,
) storeRequester

// NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a
Expand Down Expand Up @@ -470,8 +472,9 @@ func makeStoresGrantCoordinators(
settings: st,
makeStoreRequesterFunc: makeStoreRequester,
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned,
kvIOTokensTaken: metrics.KVIOTokensTaken,
kvIOTokensReturned: metrics.KVIOTokensReturned,
kvIOTokensBypassed: metrics.KVIOTokensBypassed,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
l0CompactedBytes: metrics.L0CompactedBytes,
Expand Down Expand Up @@ -1019,8 +1022,9 @@ type GrantCoordinatorMetrics struct {
KVSlotAdjusterDecrements *metric.Counter
// TODO(banabrick): Make these metrics per store.
KVIOTokensExhaustedDuration *metric.Counter
KVIOTotalTokensTaken *metric.Counter
KVIOTotalTokensReturned *metric.Counter
KVIOTokensTaken *metric.Counter
KVIOTokensReturned *metric.Counter
KVIOTokensBypassed *metric.Counter
KVIOTokensAvailable *metric.Gauge
KVElasticIOTokensAvailable *metric.Gauge
L0CompactedBytes *metric.Counter
Expand All @@ -1044,8 +1048,9 @@ func makeGrantCoordinatorMetrics() GrantCoordinatorMetrics {
KVIOTokensExhaustedDuration: metric.NewCounter(kvIOTokensExhaustedDuration),
SQLLeafStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementLeafStartWork), usedSlots)),
SQLRootStartUsedSlots: metric.NewGauge(addName(workKindString(SQLStatementRootStartWork), usedSlots)),
KVIOTotalTokensTaken: metric.NewCounter(kvIOTotalTokensTaken),
KVIOTotalTokensReturned: metric.NewCounter(kvIOTotalTokensReturned),
KVIOTokensTaken: metric.NewCounter(kvIOTokensTaken),
KVIOTokensReturned: metric.NewCounter(kvIOTokensReturned),
KVIOTokensBypassed: metric.NewCounter(kvIOTokensBypassed),
KVIOTokensAvailable: metric.NewGauge(kvIOTokensAvailable),
KVElasticIOTokensAvailable: metric.NewGauge(kvElasticIOTokensAvailable),
L0CompactedBytes: metric.NewCounter(l0CompactedBytes),
Expand Down
10 changes: 8 additions & 2 deletions pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,18 +739,24 @@ var (
Measurement: "Microseconds",
Unit: metric.Unit_COUNT,
}
kvIOTotalTokensTaken = metric.Metadata{
kvIOTokensTaken = metric.Metadata{
Name: "admission.granter.io_tokens_taken.kv",
Help: "Total number of tokens taken",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTotalTokensReturned = metric.Metadata{
kvIOTokensReturned = metric.Metadata{
Name: "admission.granter.io_tokens_returned.kv",
Help: "Total number of tokens returned",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTokensBypassed = metric.Metadata{
Name: "admission.granter.io_tokens_bypassed.kv",
Help: "Total number of tokens taken by work bypassing admission control (for example, follower writes without flow control)",
Measurement: "Tokens",
Unit: metric.Unit_COUNT,
}
kvIOTokensAvailable = metric.Metadata{
Name: "admission.granter.io_tokens_available.kv",
Help: "Number of tokens available",
Expand Down
10 changes: 6 additions & 4 deletions pkg/util/admission/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestGranterBasic(t *testing.T) {
makeStoreRequesterFunc: func(
ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs,
_ OnLogEntryAdmitted, _ *syncutil.Mutex,
_ OnLogEntryAdmitted, _ *metric.Counter, _ *syncutil.Mutex,
) storeRequester {
makeTestRequester := func(wc admissionpb.WorkClass) *testRequester {
req := &testRequester{
Expand Down Expand Up @@ -140,8 +140,9 @@ func TestGranterBasic(t *testing.T) {
kvIOTokensExhaustedDuration: metrics.KVIOTokensExhaustedDuration,
kvIOTokensAvailable: metrics.KVIOTokensAvailable,
kvElasticIOTokensAvailable: metrics.KVElasticIOTokensAvailable,
kvIOTotalTokensTaken: metrics.KVIOTotalTokensTaken,
kvIOTotalTokensReturned: metrics.KVIOTotalTokensReturned,
kvIOTokensTaken: metrics.KVIOTokensTaken,
kvIOTokensReturned: metrics.KVIOTokensReturned,
kvIOTokensBypassed: metrics.KVIOTokensBypassed,
l0CompactedBytes: metrics.L0CompactedBytes,
l0TokensProduced: metrics.L0TokensProduced,
workQueueMetrics: workQueueMetrics,
Expand Down Expand Up @@ -324,7 +325,8 @@ func TestStoreCoordinators(t *testing.T) {
makeRequesterFunc: makeRequesterFunc,
makeStoreRequesterFunc: func(
ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreReplicatedWorkAdmitted,
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted, _ *syncutil.Mutex) storeRequester {
settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs, _ OnLogEntryAdmitted,
_ *metric.Counter, _ *syncutil.Mutex) storeRequester {
reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts)
reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts)
str := &storeTestRequester{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/replicated_write_admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestReplicatedWriteAdmission(t *testing.T) {
tg[admissionpb.RegularWorkClass],
tg[admissionpb.ElasticWorkClass],
},
st, metrics, opts, knobs, &noopOnLogEntryAdmitted{}, &mockCoordMu,
st, metrics, opts, knobs, &noopOnLogEntryAdmitted{}, metric.NewCounter(metric.Metadata{}), &mockCoordMu,
).(*StoreWorkQueue)
tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass]
tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass]
Expand Down
7 changes: 6 additions & 1 deletion pkg/util/admission/work_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,8 @@ type StoreWorkQueue struct {
settings *cluster.Settings
onLogEntryAdmitted OnLogEntryAdmitted

ioTokensBypassed *metric.Counter

knobs *TestingKnobs
}

Expand Down Expand Up @@ -2072,7 +2074,8 @@ func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDon
q.updateStoreStatsAfterWorkDone(uint64(workCount), doneInfo, true)
// Since we have no control over such work, we choose to count it as
// regularWorkClass.
_ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo)
additionalTokensTaken := q.granters[admissionpb.RegularWorkClass].storeWriteDone(0 /* originalTokens */, doneInfo)
q.ioTokensBypassed.Inc(additionalTokensTaken)
}

// StatsToIgnore is called for range snapshot ingestion -- see the comment in
Expand Down Expand Up @@ -2143,6 +2146,7 @@ func makeStoreWorkQueue(
opts workQueueOptions,
knobs *TestingKnobs,
onLogEntryAdmitted OnLogEntryAdmitted,
ioTokensBypassedMetric *metric.Counter,
coordMu *syncutil.Mutex,
) storeRequester {
if knobs == nil {
Expand All @@ -2160,6 +2164,7 @@ func makeStoreWorkQueue(
timeSource: opts.timeSource,
settings: settings,
onLogEntryAdmitted: onLogEntryAdmitted,
ioTokensBypassed: ioTokensBypassedMetric,
}

opts.usesAsyncAdmit = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/admission/work_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestStoreWorkQueueBasic(t *testing.T) {
tg[admissionpb.RegularWorkClass],
tg[admissionpb.ElasticWorkClass],
},
st, metrics, opts, nil /* testing knobs */, &noopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue)
st, metrics, opts, nil /* testing knobs */, &noopOnLogEntryAdmitted{}, metric.NewCounter(metric.Metadata{}), &mockCoordMu).(*StoreWorkQueue)
tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass]
tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass]
wrkMap.resetMap()
Expand Down

0 comments on commit 6dba9cc

Please sign in to comment.