diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 189dfd08f6cf..c0446fd040ed 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -8,6 +8,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvpb", + "//pkg/kv/kvserver/raftlog", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index d67e6b7d509a..d3d567402fc8 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -120,7 +121,7 @@ type Controller interface { FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) // AdmitRaftEntry informs admission control of a raft log entry being // written to storage. - AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) + AdmitRaftEntry(context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -244,7 +245,7 @@ func (n *controllerImpl) AdmitKVWork( if err != nil { return Handle{}, err } - admissionEnabled = storeWorkHandle.AdmissionEnabled() + admissionEnabled = storeWorkHandle.UseAdmittedWorkDone() if admissionEnabled { defer func() { if retErr != nil { @@ -409,9 +410,60 @@ func (n *controllerImpl) FollowerStoreWriteBytes( // AdmitRaftEntry implements the Controller interface. func (n *controllerImpl) AdmitRaftEntry( - roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry, + ctx context.Context, + tenantID roachpb.TenantID, + storeID roachpb.StoreID, + rangeID roachpb.RangeID, + entry raftpb.Entry, ) { - panic("unimplemented") + typ, err := raftlog.EncodingOf(entry) + if err != nil { + log.Errorf(ctx, "unable to determine raft command encoding: %v", err) + return + } + if !typ.UsesAdmissionControl() { + return // nothing to do + } + meta, err := raftlog.DecodeRaftAdmissionMeta(entry.Data) + if err != nil { + log.Errorf(ctx, "unable to decode raft command admission data: %v", err) + return + } + + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + log.Errorf(ctx, "unable to find queue for store: %s", storeID) + return // nothing to do + } + + wi := admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.WorkPriority(meta.AdmissionPriority), + CreateTime: meta.AdmissionCreateTime, + BypassAdmission: false, + RequestedCount: int64(entry.Size()), + } + wi.ReplicatedWorkInfo = admission.ReplicatedWorkInfo{ + Enabled: true, + RangeID: rangeID, + Origin: meta.AdmissionOriginNode, + LogPosition: admission.LogPosition{ + Term: entry.Term, + Index: entry.Index, + }, + Ingested: typ.IsSideloaded(), + } + + handle, err := storeAdmissionQ.Admit(ctx, admission.StoreWriteWorkInfo{ + WorkInfo: wi, + }) + if err != nil { + log.Errorf(ctx, "error while admitting to store admission queue: %v", err) + return + } + if handle.UseAdmittedWorkDone() { + log.Fatalf(ctx, "unexpected handle.UseAdmittedWorkDone") + } } // FollowerStoreWriteBytes captures stats about writes done to a store by a diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index 60abec7421fe..51a2932ece82 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -327,6 +327,76 @@ package kvflowcontrol // it can transition into the mode described in I3a where we deduct/block for // flow tokens for subsequent quorum writes. // +// I12. How does this interact with epoch-LIFO? Or CreateTime ordering +// generally? +// - Background: Epoch-LIFO tries to reduce lower percentile admission queueing +// delay (at the expense of higher percentile delay) by switching between the +// standard CreateTime-based FIFO ordering to LIFO-within-an-epoch. Work +// submitted by a transaction with CreateTime T is slotted into the epoch +// number with time interval [E, E+100ms) where T is contained in this +// interval. The work will start executing after the epoch is closed, at +// ~E+100ms. This increases transaction latency by at most ~100ms. When the +// epoch closes, all nodes start executing the transactions in that epoch in +// LIFO order, and will implicitly have the same set of competing +// transactions[^10], a set that stays unchanged until the next epoch closes. +// And by the time the next epoch closes, and the current epoch's transactions +// are deprioritized, 100ms will have elapsed, which is selected to be long +// enough for most of these now-admitted to have finished their work. +// - We switch to LIFO-within-an-epoch once we start observing that the +// maximum queuing delay for work within a starts +// exceeding the ~100ms we'd add with epoch-LIFO. +// - For below-raft work queue ordering, we ignore CreateTime when ordering work +// within the same range. Within a given , admission +// takes place in raft log order (i.e. entries with lower terms get admitted +// first, or lower indexes within the same term). +// - NB: Regarding "admission takes place in raft log order", we can implement +// this differently. We introduced log-position based ordering to simplify +// the implementation of token returns where we release tokens by specifying +// the log position up-to-which we want to release held tokens[^11]. But +// with additional tracking in the below-raft work queues, if we know that +// work W2 with log position L2 got admitted, and corresponded to F flow +// token deductions at the origin, and we also know that work W1 with log +// position L1 is currently queued, also corresponding to F flow token +// deductions at the origin, we could inform the origin node to return flow +// tokens up to L1 and still get what we want -- a return of F flow tokens +// when each work gets admitted. +// - To operate within cluster-wide FIFO ordering, we order by CreateTime when +// comparing work across different ranges. Writes for a single range, as +// observed by a given store below-raft (follower or otherwise) travel along +// a single stream. Consider the case where a single store S3 receives +// replication traffic for two ranges R1 and R2, originating from two separate +// nodes N1 and N2. If N1 is issuing writes with strictly older CreateTimes, +// when returning flow tokens we should prefer N1. +// - What about CreateTime ordering within a ? Flow +// tokens deductions aren't tied to create times -- they're tied to work +// classes on the sender. So we still want priority-based ordering to +// release regular flow tokens before elastic ones, but releasing flow +// tokens for work with lower CreateTimes does not actually promote doing +// older work. Below-raft admission is all asynchronous. To get back to +// CreateTime ordering, we'd need to do it above-raft, by introducing a +// WorkQueue-like structure for requests waiting for flow tokens. +// - We could do the same for epoch-LIFO, but implementing it above-raft in +// WorkQueue-like structures where requests wait for flow tokens. +// Cluster-wide we'd allow work within a given epoch to start issuing +// replication traffic. +// - How does this interact with the CreateTime ordering below-raft when +// comparing work across different ranges? +// - When there's no epoch-LIFO happening, we have cluster-wide FIFO +// ordering within a pair as described above. +// - When epoch-LIFO is happening across all senders issuing replication +// traffic to a given receiver store, we're seeing work within the same +// epoch, but we'll be returning flow tokens to nodes issuing work with +// older CreateTimes. So defeating the LIFO in epoch-LIFO. +// - When epoch-LIFO is happening only on a subset of the senders issuing +// replication traffic, we'll again be returning flow tokens to nodes +// issuing work with older CreateTimes. Which is undefined. +// - Is it strange that different nodes can admit work from "different +// epochs"? What are we to do below-raft, when deciding where to return flow +// tokens back to, since it's all for the same ? Maybe we +// need to pass down whether the work was admitted at the sender with +// LIFO/FIFO, and return flow tokens in {LIFO,FIFO} order across all nodes +// that issued {LIFO,FIFO} work? +// // --- // // [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent @@ -373,6 +443,9 @@ package kvflowcontrol // machine application get significantly behind due to local scheduling // reasons by using the same goroutine to do both async raft log writes // and state machine application. +// [^10]: This relies on partially synchronized clocks without requiring +// explicit coordination. +// [^11]: See UpToRaftLogPosition in AdmittedRaftLogEntries. // // TODO(irfansharif): These descriptions are too high-level, imprecise and // possibly wrong. Fix that. After implementing these interfaces and integrating diff --git a/pkg/server/node.go b/pkg/server/node.go index 2e9dbbbe3c40..c57b1ce4d467 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -907,7 +907,7 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { diskStats = s } metrics = append(metrics, admission.StoreMetrics{ - StoreID: int32(store.StoreID()), + StoreID: store.StoreID(), Metrics: m.Metrics, WriteStallCount: m.WriteStallCount, DiskStats: diskStats}) diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index b288a3fe3b14..3ed968ba72fa 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "scheduler_latency_listener.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", + "testing_knobs.go", "tokens_linear_model.go", "work_queue.go", ], @@ -53,6 +54,7 @@ go_test( "elastic_cpu_work_queue_test.go", "granter_test.go", "io_load_listener_test.go", + "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", @@ -67,6 +69,7 @@ go_test( "//pkg/testutils/datapathutils", "//pkg/testutils/echotest", "//pkg/util/admission/admissionpb", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index ccbf4a2de2bc..e59d16927952 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -522,8 +522,9 @@ func workKindString(workKind WorkKind) string { // all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these // cumulative values are mutually consistent. type storeAdmissionStats struct { - // Total requests that called AdmittedWorkDone or BypassedWorkDone. - admittedCount uint64 + // Total requests that called {Admitted,Bypassed}WorkDone, or in the case of + // replicated writes, the requests that called Admit. + workCount uint64 // Sum of StoreWorkDoneInfo.WriteBytes. // // TODO(sumeer): writeAccountedBytes and ingestedAccountedBytes are not @@ -548,7 +549,7 @@ type storeAdmissionStats struct { // (e.g. for logging). aux struct { // These bypassed numbers are already included in the corresponding - // {admittedCount, writeAccountedBytes, ingestedAccountedBytes}. + // {workCount, writeAccountedBytes, ingestedAccountedBytes}. bypassedCount uint64 writeBypassedAccountedBytes uint64 ingestedBypassedAccountedBytes uint64 diff --git a/pkg/util/admission/elastic_cpu_work_queue.go b/pkg/util/admission/elastic_cpu_work_queue.go index 04e05f2a5158..3efcc20c178a 100644 --- a/pkg/util/admission/elastic_cpu_work_queue.go +++ b/pkg/util/admission/elastic_cpu_work_queue.go @@ -81,7 +81,7 @@ func (e *ElasticCPUWorkQueue) Admit( if duration > MaxElasticCPUDuration { duration = MaxElasticCPUDuration } - info.requestedCount = duration.Nanoseconds() + info.RequestedCount = duration.Nanoseconds() enabled, err := e.workQueue.Admit(ctx, info) if err != nil { return nil, err diff --git a/pkg/util/admission/elastic_cpu_work_queue_test.go b/pkg/util/admission/elastic_cpu_work_queue_test.go index f044f984aeb1..291791b43c13 100644 --- a/pkg/util/admission/elastic_cpu_work_queue_test.go +++ b/pkg/util/admission/elastic_cpu_work_queue_test.go @@ -161,7 +161,7 @@ func (t *testElasticCPUInternalWorkQueue) Admit( _ context.Context, info WorkInfo, ) (enabled bool, err error) { if !t.disabled { - t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.requestedCount))) + t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.RequestedCount))) } return !t.disabled, nil } diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 1a7a2005e60c..edf2b24b28e9 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -113,7 +113,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( if unsafeGc, ok := sgc.gcMap.Load(int64(m.StoreID)); ok { gc := (*GrantCoordinator)(unsafeGc) gc.pebbleMetricsTick(ctx, m) - iotc.UpdateIOThreshold(roachpb.StoreID(m.StoreID), gc.ioLoadListener.ioThreshold) + iotc.UpdateIOThreshold(m.StoreID, gc.ioLoadListener.ioThreshold) } else { log.Warningf(ctx, "seeing metrics for unknown storeID %d", m.StoreID) @@ -135,7 +135,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( }() } -func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoordinator { +func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) *GrantCoordinator { coord := &GrantCoordinator{ settings: sgc.settings, useGrantChains: false, @@ -168,7 +168,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, granters, sgc.settings, sgc.workQueueMetrics, opts) + storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +336,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, +) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a // regular cluster node. Caller is responsible for: diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index a14a2fb9e63d..8787020d0886 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -594,7 +594,7 @@ type IOThresholdConsumer interface { // StoreMetrics are the metrics for a store. type StoreMetrics struct { - StoreID int32 + StoreID roachpb.StoreID *pebble.Metrics WriteStallCount int64 // Optional. diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index e5635fa2c3f1..c085950b12ce 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -109,8 +109,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ workKind: KVWork, @@ -273,8 +274,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -444,7 +445,7 @@ func (m *testMetricsProvider) setMetricsForStores(stores []int32, metrics pebble m.metrics = m.metrics[:0] for _, s := range stores { m.metrics = append(m.metrics, StoreMetrics{ - StoreID: s, + StoreID: roachpb.StoreID(s), Metrics: &metrics, }) } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 0c39b22c339c..f7b65d20ca21 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -15,6 +15,7 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -151,7 +152,7 @@ const l0SubLevelCountOverloadThreshold = 20 // storeWriteDone), the tokens are adjusted differently for the // flush/compaction L0 tokens and for the "disk bandwidth" tokens. type ioLoadListener struct { - storeID int32 + storeID roachpb.StoreID settings *cluster.Settings kvRequester storeRequester kvGranter granterWithIOTokens diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 44a98b616bc0..b10507d9cd7d 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -67,11 +67,11 @@ func TestIOLoadListener(t *testing.T) { case "prep-admission-stats": req.stats = storeAdmissionStats{ - admittedCount: 0, + workCount: 0, writeAccountedBytes: 0, ingestedAccountedBytes: 0, } - d.ScanArgs(t, "admitted", &req.stats.admittedCount) + d.ScanArgs(t, "admitted", &req.stats.workCount) if d.HasArg("write-bytes") { d.ScanArgs(t, "write-bytes", &req.stats.writeAccountedBytes) } @@ -290,7 +290,7 @@ func TestBadIOLoadListenerStats(t *testing.T) { d.BytesRead = rand.Uint64() d.BytesWritten = rand.Uint64() d.ProvisionedBandwidth = 1 << 20 - req.stats.admittedCount = rand.Uint64() + req.stats.workCount = rand.Uint64() req.stats.writeAccountedBytes = rand.Uint64() req.stats.ingestedAccountedBytes = rand.Uint64() req.stats.statsToIgnore.Bytes = rand.Uint64() diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go new file mode 100644 index 000000000000..6604f1a76910 --- /dev/null +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -0,0 +1,450 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +// TestReplicatedWriteAdmission is a data-driven test for admission control of +// replicated writes. We provide the following syntax: +// +// - "init" +// Initializes the store work queues and test granters with 0B of +// {regular,elastic} tokens. +// +// - "admit" tenant=t pri= create-time= \ +// size= range=r log-position=/ origin=n \ +// [ingested=] +// Admit a replicated write request from the given tenant, of the given +// priority/size/create-time, writing to the given log position for the +// specified raft group. Also specified is the node where this request +// originated and whether it was ingested (i.e. as sstables). +// +// - "granter" [class={regular,elastic}] adjust-tokens={-,+} +// Adjust the available {regular,elastic} tokens. If no class is specified, +// the adjustment applies across both work classes. +// +// - "grant" [class={regular,elastic}] +// Grant waiting requests until tokens are depleted or there are no more +// requests waiting. If no class is specified, we grant admission across +// both classes. +// +// - "tenant-weights" [t=]... +// Set weights for each tenant. +// +// - "print" +// Pretty-print work queue internal state (waiting requests, consumed tokens +// per-tenant, physical admission statistics, tenant weights, etc). +// +// TODO(irfansharif): Add tests for epoch-LIFO after sorting out I12 from +// kvflowcontrol/doc.go. +func TestReplicatedWriteAdmission(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + dir := datapathutils.TestDataPath(t, "replicated_write_admission") + datadriven.Walk(t, dir, func(t *testing.T, path string) { + var ( + storeWorkQueue *StoreWorkQueue + buf builderWithMu + tg [admissionpb.NumWorkClasses]*testReplicatedWriteGranter + ) + defer func() { + if storeWorkQueue != nil { + storeWorkQueue.close() + } + }() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + tg = [admissionpb.NumWorkClasses]*testReplicatedWriteGranter{ + admissionpb.RegularWorkClass: newTestReplicatedWriteGranter(t, admissionpb.RegularWorkClass, &buf), + admissionpb.ElasticWorkClass: newTestReplicatedWriteGranter(t, admissionpb.ElasticWorkClass, &buf), + } + registry := metric.NewRegistry() + metrics := makeWorkQueueMetrics("", registry) + opts := makeWorkQueueOptions(KVWork) + opts.usesTokens = true + opts.timeSource = timeutil.NewManualTime(tzero) + opts.disableEpochClosingGoroutine = true + knobs := &TestingKnobs{ + AdmittedReplicatedWorkInterceptor: func( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + requestedTokens int64, + createTime int64, + ) { + ingested := "" + if rwi.Ingested { + ingested = " ingested" + } + buf.printf("admitted [tenant=t%d pri=%s create-time=%s size=%s range=r%s origin=n%s log-position=%s%s]", + tenantID.ToUint64(), pri, timeutil.FromUnixNanos(createTime).Sub(tzero), + printTrimmedBytes(requestedTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) + }, + } + storeWorkQueue = makeStoreWorkQueue( + log.MakeTestingAmbientContext(tracing.NewTracer()), + roachpb.StoreID(1), + [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, knobs, + ).(*StoreWorkQueue) + tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] + tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] + return printTestGranterTokens(tg) + + case "admit": + require.NotNilf(t, storeWorkQueue, "uninitialized store work queue (did you use 'init'?)") + var arg string + + // Parse tenant=t. + d.ScanArgs(t, "tenant", &arg) + ti, err := strconv.Atoi(strings.TrimPrefix(arg, "t")) + require.NoError(t, err) + tenantID := roachpb.MustMakeTenantID(uint64(ti)) + + // Parse pri=. + d.ScanArgs(t, "pri", &arg) + pri, found := reverseWorkPriorityDict[arg] + require.True(t, found) + + // Parse size=. + d.ScanArgs(t, "size", &arg) + bytes, err := humanizeutil.ParseBytes(arg) + require.NoError(t, err) + + // Parse range=r. + d.ScanArgs(t, "range", &arg) + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + rangeID := roachpb.RangeID(ri) + + // Parse origin=n. + d.ScanArgs(t, "origin", &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID := roachpb.NodeID(ni) + + // Parse log-position=/. + logPosition := parseLogPosition(t, d) + + // Parse create-time=. + d.ScanArgs(t, "create-time", &arg) + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + createTime := tzero.Add(dur) + + // Parse ingested=. + var ingested bool + if d.HasArg("ingested") { + d.ScanArgs(t, "ingested", &arg) + ingested, err = strconv.ParseBool(arg) + require.NoError(t, err) + } + + info := StoreWriteWorkInfo{ + WorkInfo: WorkInfo{ + TenantID: tenantID, + Priority: pri, + CreateTime: createTime.UnixNano(), + RequestedCount: bytes, + ReplicatedWorkInfo: ReplicatedWorkInfo{ + Enabled: true, + RangeID: rangeID, + Origin: nodeID, + LogPosition: logPosition, + Ingested: ingested, + }, + }, + } + + handle, err := storeWorkQueue.Admit(ctx, info) + require.NoError(t, err) + require.False(t, handle.UseAdmittedWorkDone()) + return buf.stringAndReset() + + case "granter": + // Parse adjust-tokens={+,-}. + var arg string + d.ScanArgs(t, "adjust-tokens", &arg) + isPositive := strings.Contains(arg, "+") + arg = strings.TrimPrefix(arg, "+") + arg = strings.TrimPrefix(arg, "-") + delta, err := humanizeutil.ParseBytes(arg) + require.NoError(t, err) + if !isPositive { + delta = -delta + } + + var wcs []admissionpb.WorkClass + if d.HasArg("class") { + wcs = append(wcs, parseWorkClass(t, d)) + } else { + wcs = append(wcs, + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass) + } + for _, wc := range wcs { + tg[wc].tokens += delta + } + return printTestGranterTokens(tg) + + case "tenant-weights": + weightMap := make(map[uint64]uint32) + for _, cmdArg := range d.CmdArgs { + tenantID, err := strconv.Atoi(strings.TrimPrefix(cmdArg.Key, "t")) + require.NoError(t, err) + weight, err := strconv.Atoi(cmdArg.Vals[0]) + require.NoError(t, err) + weightMap[uint64(tenantID)] = uint32(weight) + } + storeWorkQueue.SetTenantWeights(weightMap) + return "" + + case "grant": + var wcs []admissionpb.WorkClass + if d.HasArg("class") { + wcs = append(wcs, parseWorkClass(t, d)) + } else { + wcs = append(wcs, + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass) + } + for _, wc := range wcs { + tg[wc].grant() + } + return buf.stringAndReset() + + case "print": + storeWorkQueue.mu.Lock() + defer storeWorkQueue.mu.Unlock() + return fmt.Sprintf("physical-stats: work-count=%d written-bytes=%s ingested-bytes=%s\n[regular work queue]: %s\n[elastic work queue]: %s\n", + storeWorkQueue.mu.stats.workCount, + printTrimmedBytes(int64(storeWorkQueue.mu.stats.writeAccountedBytes)), + printTrimmedBytes(int64(storeWorkQueue.mu.stats.ingestedAccountedBytes)), + printWorkQueue(&storeWorkQueue.q[admissionpb.RegularWorkClass]), + printWorkQueue(&storeWorkQueue.q[admissionpb.ElasticWorkClass]), + ) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) + }) +} + +func parseLogPosition(t *testing.T, d *datadriven.TestData) LogPosition { + // Parse log-position=/. + var arg string + d.ScanArgs(t, "log-position", &arg) + inner := strings.Split(arg, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return LogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} + +func parseWorkClass(t *testing.T, d *datadriven.TestData) admissionpb.WorkClass { + // Parse class={regular,elastic}. + var arg string + d.ScanArgs(t, "class", &arg) + var wc admissionpb.WorkClass + switch arg { + case "regular": + wc = admissionpb.RegularWorkClass + case "elastic": + wc = admissionpb.ElasticWorkClass + default: + t.Fatalf("unexpected class: %s", arg) + } + return wc +} + +func printTrimmedBytes(bytes int64) string { + return strings.ReplaceAll(string(humanizeutil.IBytes(bytes)), " ", "") +} + +func printTestGranterTokens(tg [admissionpb.NumWorkClasses]*testReplicatedWriteGranter) string { + var buf strings.Builder + for i, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString(fmt.Sprintf("[%s] %s tokens available", wc, printTrimmedBytes(tg[wc].tokens))) + } + return buf.String() +} + +func printWorkQueue(q *WorkQueue) string { + var buf strings.Builder + q.mu.Lock() + defer q.mu.Unlock() + buf.WriteString(fmt.Sprintf("len(tenant-heap)=%d", len(q.mu.tenantHeap))) + if len(q.mu.tenantHeap) > 0 { + buf.WriteString(fmt.Sprintf(" top-tenant=t%d", q.mu.tenantHeap[0].id)) + } + var ids []uint64 + for id := range q.mu.tenants { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + for _, id := range ids { + tenant := q.mu.tenants[id] + buf.WriteString(fmt.Sprintf("\n tenant=t%d weight=%d fifo-threshold=%s used=%s", + tenant.id, + tenant.weight, + admissionpb.WorkPriority(tenant.fifoPriorityThreshold), + printTrimmedBytes(int64(tenant.used)), + )) + if len(tenant.waitingWorkHeap) > 0 { + buf.WriteString("\n") + + for i := range tenant.waitingWorkHeap { + w := tenant.waitingWorkHeap[i] + if i != 0 { + buf.WriteString("\n") + } + + ingested := "" + if w.replicated.Ingested { + ingested = " ingested " + } + + buf.WriteString(fmt.Sprintf(" [%d: pri=%s create-time=%s size=%s range=r%d origin=n%d log-position=%s%s]", i, + w.priority, + timeutil.FromUnixNanos(w.createTime).Sub(tzero), + printTrimmedBytes(w.requestedCount), + w.replicated.RangeID, + w.replicated.Origin, + w.replicated.LogPosition, + ingested, + )) + } + } + } + return buf.String() +} + +// tzero represents the t=0, the earliest possible time. All other +// create-time= is relative to this time. +var tzero = timeutil.Unix(0, 0) + +var reverseWorkPriorityDict map[string]admissionpb.WorkPriority + +func init() { + reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) + for k, v := range admissionpb.WorkPriorityDict { + reverseWorkPriorityDict[v] = k + } +} + +type testReplicatedWriteGranter struct { + t *testing.T + wc admissionpb.WorkClass + buf *builderWithMu + r requester + + tokens int64 +} + +var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} + +func newTestReplicatedWriteGranter( + t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, +) *testReplicatedWriteGranter { + return &testReplicatedWriteGranter{ + t: t, + wc: wc, + buf: buf, + } +} +func (tg *testReplicatedWriteGranter) grantKind() grantKind { + return token +} + +func (tg *testReplicatedWriteGranter) tryGet(count int64) bool { + if count > tg.tokens { + tg.buf.printf("[%s] try-get=%s available=%s => insufficient tokens", + tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens)) + return false + } + + tg.buf.printf("[%s] try-get=%s available=%s => sufficient tokens", + tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens)) + tg.tokens -= count + return true +} + +func (tg *testReplicatedWriteGranter) returnGrant(count int64) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) tookWithoutPermission(count int64) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) continueGrantChain(grantChainID grantChainID) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) grant() { + for { + if tg.tokens <= 0 { + return // nothing left to do + } + if !tg.r.hasWaitingRequests() { + return // nothing left to do + } + _ = tg.r.granted(0 /* unused */) + } +} + +func (tg *testReplicatedWriteGranter) storeWriteDone( + originalTokens int64, doneInfo StoreWorkDoneInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index be66b50452d1..2e2eca842b8e 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -177,8 +177,8 @@ func (e *storePerWorkTokenEstimator) updateEstimates( if adjustedIntL0IngestedBytes < 0 { adjustedIntL0IngestedBytes = 0 } - intWorkCount := int64(admissionStats.admittedCount) - - int64(e.cumStoreAdmissionStats.admittedCount) + intWorkCount := int64(admissionStats.workCount) - + int64(e.cumStoreAdmissionStats.workCount) intL0WriteAccountedBytes := int64(admissionStats.writeAccountedBytes) - int64(e.cumStoreAdmissionStats.writeAccountedBytes) // Note that these are not L0 ingested bytes, since we don't know how diff --git a/pkg/util/admission/store_token_estimation_test.go b/pkg/util/admission/store_token_estimation_test.go index faf2aa46836e..b2898f203b46 100644 --- a/pkg/util/admission/store_token_estimation_test.go +++ b/pkg/util/admission/store_token_estimation_test.go @@ -58,7 +58,7 @@ func TestStorePerWorkTokenEstimator(t *testing.T) { d.ScanArgs(t, "admitted", &admitted) d.ScanArgs(t, "write-accounted", &writeAccounted) d.ScanArgs(t, "ingested-accounted", &ingestedAccounted) - admissionStats.admittedCount += admitted + admissionStats.workCount += admitted admissionStats.writeAccountedBytes += writeAccounted admissionStats.ingestedAccountedBytes += ingestedAccounted if d.HasArg("bypassed-count") { diff --git a/pkg/util/admission/testdata/io_load_listener b/pkg/util/admission/testdata/io_load_listener index 1f64c463641e..5575a7fd5f55 100644 --- a/pkg/util/admission/testdata/io_load_listener +++ b/pkg/util/admission/testdata/io_load_listener @@ -6,7 +6,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Even though above the threshold, the first 60 ticks don't limit the tokens. set-state l0-bytes=10000 l0-added-write=1000 l0-files=21 l0-sublevels=21 @@ -76,7 +76,7 @@ tick: 59, setAvailableTokens: io-tokens=unlimited elastic-disk-bw-tokens=unlimit prep-admission-stats admitted=10000 write-bytes=40000 ---- -{admittedCount:10000 writeAccountedBytes:40000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:10000 writeAccountedBytes:40000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Delta added is 100,000. The l0-bytes are the same, so compactions removed # 100,000 bytes. Smoothed removed by compactions is 50,000. Each admitted is @@ -151,7 +151,7 @@ tick: 59, setAvailableTokens: io-tokens=169 elastic-disk-bw-tokens=unlimited prep-admission-stats admitted=20000 write-bytes=80000 ---- -{admittedCount:20000 writeAccountedBytes:80000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:20000 writeAccountedBytes:80000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Same delta as previous but smoothing bumps up the tokens to 25,000. set-state l0-bytes=10000 l0-added-write=201000 l0-files=21 l0-sublevels=21 @@ -232,7 +232,7 @@ setAvailableTokens: io-tokens=365 elastic-disk-bw-tokens=unlimited prep-admission-stats admitted=30000 write-bytes=120000 ---- -{admittedCount:30000 writeAccountedBytes:120000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:30000 writeAccountedBytes:120000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # l0-sublevels drops below threshold. We calculate the smoothed values, but # don't limit the tokens. @@ -250,7 +250,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=1000 l0-added-ingested=0 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -262,7 +262,7 @@ tick: 0, setAvailableTokens: io-tokens=unlimited elastic-disk-bw-tokens=unlimite # the admitted requests. prep-admission-stats admitted=10 write-bytes=130000 ingested-bytes=20000 ---- -{admittedCount:10 writeAccountedBytes:130000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:10 writeAccountedBytes:130000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # The ingested model can be fit with a multiplier of ~1.5 for the interval, # but since the l0-ingest-lm model had a previous multiplier of 0.75 and the @@ -282,7 +282,7 @@ setAvailableTokens: io-tokens=417 elastic-disk-bw-tokens=unlimited # ingested model is decayed by a factor of 2. prep-admission-stats admitted=20 write-bytes=150000 ingested-bytes=20000 ---- -{admittedCount:20 writeAccountedBytes:150000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:20 writeAccountedBytes:150000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=191000 l0-added-ingested=30000 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -296,7 +296,7 @@ setAvailableTokens: io-tokens=459 elastic-disk-bw-tokens=unlimited # bytes to L0. We don't let unaccounted bytes become negative. prep-admission-stats admitted=30 write-bytes=250000 ingested-bytes=20000 ingested-into-l0=20000 ---- -{admittedCount:30 writeAccountedBytes:250000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:30 writeAccountedBytes:250000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=211000 l0-added-ingested=30000 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -312,7 +312,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=10000 l0-added-write=1000 l0-files=1 l0-sublevels=1 print-only-first-tick=true ---- diff --git a/pkg/util/admission/testdata/replicated_write_admission/class_segmentation b/pkg/util/admission/testdata/replicated_write_admission/class_segmentation new file mode 100644 index 000000000000..714c5b395806 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/class_segmentation @@ -0,0 +1,63 @@ +# Verify that we'll admit requests based on the {regular,elastic} tokens being +# generated. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one at low-pri but lower log position and lower +# create-time. It gets classified as an elastic request. +admit tenant=t1 pri=low-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[elastic] try-get=1B available=0B => insufficient tokens + +# And one at high-pri but higher log position and higher create-time. It gets +# classified as a regular request. +admit tenant=t1 pri=high-pri create-time=2ms size=1B range=r1 origin=n1 log-position=4/21 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# Observe both waiting requests and physical admission stats. Note that the two +# requests sit within their own work queues (segmented by class). +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=high-pri create-time=2ms size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=low-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] + +# Produce 1B worth of regular tokens and verify that it only admits work +# waiting in the regular work queue. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +grant +---- +admitted [tenant=t1 pri=high-pri create-time=2ms size=1B range=r1 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=0 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B +[elastic work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=low-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] + +# Do the same for elastic tokens. +granter class=elastic adjust-tokens=+1B +---- +[regular] 0B tokens available +[elastic] 1B tokens available + +grant +---- +admitted [tenant=t1 pri=low-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/epoch_lifo b/pkg/util/admission/testdata/replicated_write_admission/epoch_lifo new file mode 100644 index 000000000000..6262fd963a6a --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/epoch_lifo @@ -0,0 +1,10 @@ +# XXX: Fill this out or delete entirely after discussing I12 from +# kvflowcontrol/doc.go. Consider disabling epoch-LIFO completely, or just for +# IO work queues. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range new file mode 100644 index 000000000000..99694e7fdbc7 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -0,0 +1,52 @@ +# Verify that we use create-time based ordering for replicated write admission +# when done so across different ranges. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5ms with a lower log position on r1. +admit tenant=t1 pri=normal-pri create-time=5ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1ms with a higher log position on r2. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower create time sorts first despite having the higher +# log position. This is because the requests are from different ranges, and we +# don't compare log positions. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=4/21] + [1: pri=normal-pri create-time=5ms size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5ms size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range new file mode 100644 index 000000000000..fb6a9f7fc34c --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_same_range @@ -0,0 +1,51 @@ +# Verify that we ignore create-time based ordering for replicated write +# admission when writes happen within the same range. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5ms but with a lower log position. +admit tenant=t1 pri=normal-pri create-time=5ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1ms but but higher log position. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note how the +# request with the lower log position sorts first despite having the higher +# create-time. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=5ms size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower log position despite the higher create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=5ms size=1B range=r1 origin=n1 log-position=4/20] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position b/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position new file mode 100644 index 000000000000..9255ef35c0b4 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position @@ -0,0 +1,50 @@ +# Verify that we'll admit in priority order, even if the higher priority work +# has a higher log position. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one at normal-pri but lower log position. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one at high-pri but higher log position. +admit tenant=t1 pri=high-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note that the +# high-pri request sorts first, despite having a higher log position. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=high-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] + [1: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# high-pri request that gets through. +grant class=regular +---- +admitted [tenant=t1 pri=high-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/overview b/pkg/util/admission/testdata/replicated_write_admission/overview new file mode 100644 index 000000000000..e330bc76225c --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/overview @@ -0,0 +1,70 @@ +# Walk through the basics of the datadriven syntax. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Try to admit two requests of 1B each, at incrementing log positions. The +# first requests tries the fast path and fails admission, and gets added to the +# work queue's internal heap. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# Observe that the physical stats show that the actual work was done, but the +# work is virtually enqueued in the work queue for deferred admission. +print +---- +physical-stats: work-count=1 written-bytes=1B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Admit a second request. Since there's already a request waiting, we don't get +# the fast path. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 ingested=true +---- + +# Observe both waiting requests. +print +---- +physical-stats: work-count=2 written-bytes=1B ingested-bytes=1B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 ingested ] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 2B worth of regular tokens. +granter class=regular adjust-tokens=+2B +---- +[regular] 2B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 2B worth of tokens, and 2 waiting +# requests wanting 1B each, we're able to admit both. We do so in log-position +# order. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 ingested] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 2B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe the empty tenant heaps (all work was admitted) and 2B worth of used +# tokens for t1. +print +---- +physical-stats: work-count=2 written-bytes=1B ingested-bytes=1B +[regular work queue]: len(tenant-heap)=0 + tenant=t1 weight=1 fifo-threshold=low-pri used=2B +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness new file mode 100644 index 000000000000..ed611681cff8 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -0,0 +1,68 @@ +# Observe how tokens are consumed fairly across tenants. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# For two tenants t1 and t2, try to admit two requests of 1B each at +# incrementing log positions. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/20 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/21 +---- + +# Observe all waiting requests. +print +---- +physical-stats: work-count=4 written-bytes=4B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] + tenant=t2 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/20] + [1: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 2B worth of regular tokens. +granter class=regular adjust-tokens=+2B +---- +[regular] 2B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 2B worth of tokens, and 4 waiting +# requests wanting 1B each from 2 tenants, we're able to 1 request from each +# tenant. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/20] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 2B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe that each tenant still has one waiting request. +print +---- +physical-stats: work-count=4 written-bytes=4B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t2 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] + tenant=t2 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_weights b/pkg/util/admission/testdata/replicated_write_admission/tenant_weights new file mode 100644 index 000000000000..201e9b865d69 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_weights @@ -0,0 +1,104 @@ +# Observe how tokens are consumed proportionally across tenants, as configured +# by tenant weights. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +tenant-weights t1=2 t2=5 +---- + +# For two tenants t1 and t2, try to admit 5 requests of 1B each at +# incrementing log positions. +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21 +---- + +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/22 +---- + +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/23 +---- + +admit tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/24 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/20 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/21 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/22 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/23 +---- + +admit tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/24 +---- + +# Observe all waiting requests. +print +---- +physical-stats: work-count=10 written-bytes=10B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t1 + tenant=t1 weight=2 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] + [2: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/22] + [3: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/23] + [4: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/24] + tenant=t2 weight=5 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/20] + [1: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/21] + [2: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/22] + [3: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/23] + [4: pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/24] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 7B worth of regular tokens. +granter class=regular adjust-tokens=+7B +---- +[regular] 7B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 7B worth of tokens, and 10 waiting +# requests wanting 1B each, we'll be able to admit 7 requests. We'll bias +# towards the tenant with the higher weight (t2). +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/20] +admitted [tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/21] +admitted [tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/22] +admitted [tenant=t1 pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/21] +admitted [tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/23] +admitted [tenant=t2 pri=normal-pri create-time=1ms size=1B range=r2 origin=n1 log-position=5/24] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 7B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe that t2 has no waiting requests, but t1 still has 3. So we've +# processed 5 t2 requests for every 2 t1 requests, exactly what we'd expect for +# a 2:5 weight ratio between t1:t2. +print +---- +physical-stats: work-count=10 written-bytes=10B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=2 fifo-threshold=low-pri used=2B + [0: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/22] + [1: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/23] + [2: pri=normal-pri create-time=1ms size=1B range=r1 origin=n1 log-position=4/24] + tenant=t2 weight=5 fifo-threshold=low-pri used=5B +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/store_work_queue b/pkg/util/admission/testdata/store_work_queue index 26fba8dc46e4..ca4ebdfbccd6 100644 --- a/pkg/util/admission/testdata/store_work_queue +++ b/pkg/util/admission/testdata/store_work_queue @@ -5,7 +5,7 @@ print ---- regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:1} set-try-get-return-value v=true @@ -14,7 +14,7 @@ set-try-get-return-value v=true admit id=1 tenant=53 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 1: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:1 workClass:0 admissionEnabled:true} +id 1: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:1 workClass:0 useAdmittedWorkDone:true} work-done id=1 ---- @@ -25,18 +25,18 @@ set-store-request-estimates write-tokens=100 regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 1, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} admit id=2 tenant=55 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 2: admit succeeded with handle {tenantID:{InternalValue:55} writeTokens:100 workClass:0 admissionEnabled:true} +id 2: admit succeeded with handle {tenantID:{InternalValue:55} writeTokens:100 workClass:0 useAdmittedWorkDone:true} admit id=3 tenant=53 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 3: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:100 workClass:0 admissionEnabled:true} +id 3: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:100 workClass:0 useAdmittedWorkDone:true} print ---- @@ -44,7 +44,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 101, w: 1, fifo: -128 tenant-id: 55 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} set-try-get-return-value v=false @@ -65,13 +65,13 @@ regular workqueue: closed epoch: 0 tenantHeap len: 1 top tenant: 57 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 0] elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} granted ---- continueGrantChain regular 0 -id 4: admit succeeded with handle {tenantID:{InternalValue:57} writeTokens:100 workClass:0 admissionEnabled:true} +id 4: admit succeeded with handle {tenantID:{InternalValue:57} writeTokens:100 workClass:0 useAdmittedWorkDone:true} granted regular: returned 100 print @@ -81,7 +81,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} work-done id=3 ingested-bytes=1000000 additional-tokens=50000 @@ -95,7 +95,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} set-store-request-estimates write-tokens=10000 @@ -105,7 +105,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:10000} work-done id=4 write-bytes=2000 ingested-bytes=1000 additional-tokens=2000 @@ -119,7 +119,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:4 writeAccountedBytes:2000 ingestedAccountedBytes:1001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:4 writeAccountedBytes:2000 ingestedAccountedBytes:1001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:10000} bypassed-work-done work-count=10 write-bytes=1000 ingested-bytes=1000000 @@ -133,7 +133,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} stats-to-ignore ingested-bytes=12000 ingested-into-L0-bytes=9000 @@ -143,7 +143,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} # Elastic work. @@ -160,7 +160,7 @@ granted regular: returned 0 granted elastic=true ---- continueGrantChain elastic 0 -id 5: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:10000 workClass:1 admissionEnabled:true} +id 5: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:10000 workClass:1 useAdmittedWorkDone:true} granted elastic: returned 10000 print @@ -171,7 +171,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} set-try-get-return-value v=true elastic=true @@ -180,7 +180,7 @@ set-try-get-return-value v=true elastic=true admit id=6 tenant=54 priority=-40 create-time-millis=3 bypass=false ---- tryGet elastic: returning true -id 6: admit succeeded with handle {tenantID:{InternalValue:54} writeTokens:10000 workClass:1 admissionEnabled:true} +id 6: admit succeeded with handle {tenantID:{InternalValue:54} writeTokens:10000 workClass:1 useAdmittedWorkDone:true} print ---- @@ -191,7 +191,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10000, w: 1, fifo: -128 tenant-id: 54 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} work-done id=5 write-bytes=1000 additional-tokens=200 @@ -207,7 +207,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10200, w: 1, fifo: -128 tenant-id: 54 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:15 writeAccountedBytes:4000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:15 writeAccountedBytes:4000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} work-done id=6 ingested-bytes=500 additional-tokens=500 @@ -223,5 +223,5 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10200, w: 1, fifo: -128 tenant-id: 54 used: 10500, w: 1, fifo: -128 -stats:{admittedCount:16 writeAccountedBytes:4000 ingestedAccountedBytes:2001500 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:16 writeAccountedBytes:4000 ingestedAccountedBytes:2001500 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} diff --git a/pkg/util/admission/testing_knobs.go b/pkg/util/admission/testing_knobs.go new file mode 100644 index 000000000000..be53baabdfdf --- /dev/null +++ b/pkg/util/admission/testing_knobs.go @@ -0,0 +1,36 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// TestingKnobs provide fine-grained control over the various admission control +// components for testing. +type TestingKnobs struct { + // admittedReplicatedWorkInterceptor is invoked whenever replicated work is + // admitted. + AdmittedReplicatedWorkInterceptor func( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + requestedTokens int64, + createTime int64, + ) +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 3ec037c171be..4c052e28d5c9 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,9 +156,9 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() -// WorkInfo provides information that is used to order work within an -// WorkQueue. The WorkKind is not included as a field since an WorkQueue deals -// with a single WorkKind. +// WorkInfo provides information that is used to order work within an WorkQueue. +// The WorkKind is not included as a field since an WorkQueue deals with a +// single WorkKind. type WorkInfo struct { // TenantID is the id of the tenant. For single-tenant clusters, this will // always be the SystemTenantID. @@ -177,22 +177,58 @@ type WorkInfo struct { // when KV work generates other KV work (to avoid deadlock). Ignored // otherwise. BypassAdmission bool + // RequestedCount is the requested number of tokens or slots. If unset: + // - For slot-based queues we treat it as an implicit request of 1; + // - For the store work queue, we use per-request estimates to deduct some + // number of tokens at-admit time. Note that this only applies to the + // legacy above-raft admission control. With admission control for + // replicated writes (done so asynchronously, below-raft; see + // ReplicatedWrite below), we do know the size of the write being + // admitted, so RequestedCount is set accordingly. + RequestedCount int64 + // ReplicatedWorkInfo groups everything needed to admit replicated writes, done + // so asynchronously below-raft as part of replication admission control. + ReplicatedWorkInfo ReplicatedWorkInfo +} + +// ReplicatedWorkInfo groups everything needed to admit replicated writes, done +// so asynchronously below-raft as part of replication admission control. +type ReplicatedWorkInfo struct { + // Enabled captures whether this work represents a replicated write, + // subject to below-raft asynchronous admission control. + Enabled bool + // RangeID identifies the raft group on behalf of which work is being + // admitted. + RangeID roachpb.RangeID + // Origin is the node at which this work originated. It's used for + // replication admission control to inform the origin of admitted work + // (after which flow tokens are released, permitted more replicated + // writes). + Origin roachpb.NodeID + // LogPosition is the point on the raft log where the write was replicated. + LogPosition LogPosition + // Ingested captures whether the write work corresponds to an ingest + // (for sstables, for example). This is used alongside RequestedCount to + // maintain accurate linear models for L0 growth due to ingests and + // regular write batches. + Ingested bool +} - // Optional information specified only for WorkQueues where the work is tied - // to a range. This allows queued work to return early as soon as the range - // is no longer in a relevant state at this node. Currently only KVWork is - // tied to a range. - // TODO(sumeer): use these in the WorkQueue implementation. +// LogPosition is a point on the raft log, identified by a term and an index. +type LogPosition struct { + Term uint64 + Index uint64 +} - // RangeID is the range at which this work must be performed. Optional (see - // comment above). - RangeID roachpb.RangeID - // RequiresLeaseholder is true iff the work requires the leaseholder. - // Optional (see comment above). - RequiresLeaseholder bool +func (r LogPosition) String() string { + return fmt.Sprintf("%d/%d", r.Term, r.Index) +} - // For internal use by wrapper classes. The requested tokens or slots. - requestedCount int64 +func (r LogPosition) Less(o LogPosition) bool { + if r.Term != o.Term { + return r.Term < o.Term + } + return r.Index < o.Index } // WorkQueue maintains a queue of work waiting to be admitted. Ordering of @@ -229,6 +265,8 @@ type WorkQueue struct { tiedToRange bool settings *cluster.Settings + onAdmittedReplicatedWork onAdmittedReplicatedWork + // Prevents more than one caller to be in Admit and calling tryGet or adding // to the queue. It allows WorkQueue to release mu before calling tryGet and // be assured that it is not competing with another Admit. @@ -492,17 +530,18 @@ func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) { // admission control is enabled. AdmittedWorkDone must be called iff // enabled=true && err!=nil, and the WorkKind for this queue uses slots. func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) { - enabledSetting := admissionControlEnabledSettings[q.workKind] - if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) { - return false, nil + if !info.ReplicatedWorkInfo.Enabled { + enabledSetting := admissionControlEnabledSettings[q.workKind] + if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) { + return false, nil + } } - if info.requestedCount == 0 { - // Callers from outside the admission package don't set requestedCount -- - // these are implicitly requesting a count of 1. - info.requestedCount = 1 + if info.RequestedCount == 0 { + // We treat unset RequestCounts as an implicit request of 1. + info.RequestedCount = 1 } - if !q.usesTokens && info.requestedCount != 1 { - panic(errors.AssertionFailedf("unexpected requestedCount: %d", info.requestedCount)) + if !q.usesTokens && info.RequestedCount != 1 { + panic(errors.AssertionFailedf("unexpected RequestedCount: %d", info.RequestedCount)) } q.metrics.incRequested(info.Priority) tenantID := info.TenantID.ToUint64() @@ -518,14 +557,29 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) q.mu.tenants[tenantID] = tenant } + if info.ReplicatedWorkInfo.Enabled { + if info.BypassAdmission { + // TODO(irfansharif): "Admin" work (like splits, scatters, lease + // transfers, etc.), and work originating from AdmissionHeader_OTHER, + // don't use flow control tokens above-raft. So there's nothing to + // virtually enqueue below-raft, since we have nothing to return. That + // said, it might still be useful to physically admit these proposals + // for correct token modeling. To do that, we'd have to pass down + // information about it being bypassed above-raft. + panic("unexpected BypassAdmission bit set for below raft admission") + } + if !q.usesTokens { + panic("unexpected ReplicatedWrite.Enabled on slot-based queue") + } + } if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork { - tenant.used += uint64(info.requestedCount) + tenant.used += uint64(info.RequestedCount) if isInTenantHeap(tenant) { q.mu.tenantHeap.fix(tenant) } q.mu.Unlock() q.admitMu.Unlock() - q.granter.tookWithoutPermission(info.requestedCount) + q.granter.tookWithoutPermission(info.RequestedCount) q.metrics.incAdmitted(info.Priority) return true, nil } @@ -539,11 +593,20 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if len(q.mu.tenantHeap) == 0 { // Fast-path. Try to grab token/slot. // Optimistically update used to avoid locking again. - tenant.used += uint64(info.requestedCount) + tenant.used += uint64(info.RequestedCount) q.mu.Unlock() - if q.granter.tryGet(info.requestedCount) { + if q.granter.tryGet(info.RequestedCount) { q.admitMu.Unlock() q.metrics.incAdmitted(info.Priority) + if info.ReplicatedWorkInfo.Enabled { + q.onAdmittedReplicatedWork.admittedReplicatedWork( + roachpb.MustMakeTenantID(tenant.id), + info.Priority, + info.ReplicatedWorkInfo, + info.RequestedCount, + info.CreateTime, + ) + } return true, nil } // Did not get token/slot. @@ -571,11 +634,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if !ok || prevTenant != tenant { panic("prev tenantInfo no longer in map") } - if tenant.used < uint64(info.requestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d", - tenant.used, info.requestedCount)) + if tenant.used < uint64(info.RequestedCount) { + panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", + tenant.used, info.RequestedCount)) } - tenant.used -= uint64(info.requestedCount) + tenant.used -= uint64(info.RequestedCount) } else { if !ok { tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) @@ -583,14 +646,17 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } // Don't want to overflow tenant.used if it is already 0 because of // being reset to 0 by the GC goroutine. - if tenant.used >= uint64(info.requestedCount) { - tenant.used -= uint64(info.requestedCount) + if tenant.used >= uint64(info.RequestedCount) { + tenant.used -= uint64(info.RequestedCount) } } } // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { + if info.ReplicatedWorkInfo.Enabled { + panic("not equipped to deal with cancelable contexts below raft") + } // Already canceled. More likely to happen if cpu starvation is // causing entering into the work queue to be delayed. q.mu.Unlock() @@ -606,7 +672,9 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if int(info.Priority) < tenant.fifoPriorityThreshold { ordering = lifoWorkOrdering } - work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.requestedCount, startTime, q.mu.epochLengthNanos) + work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.RequestedCount, startTime, q.mu.epochLengthNanos) + work.replicated = info.ReplicatedWorkInfo + inTenantHeap := isInTenantHeap(tenant) if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering { heap.Push(&tenant.waitingWorkHeap, work) @@ -618,11 +686,16 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } // Else already in tenantHeap. - // Release all locks and start waiting. + // Release all locks. q.mu.Unlock() q.admitMu.Unlock() q.metrics.recordStartWait(info.Priority) + if info.ReplicatedWorkInfo.Enabled { + return // return without waiting (admission is asynchronous) + } + + // Start waiting for admission. defer releaseWaitingWork(work) select { case <-ctx.Done(): @@ -638,16 +711,16 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if work.heapIndex == -1 { // No longer in heap. Raced with token/slot grant. if !q.usesTokens { - if tenant.used < uint64(info.requestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d", - tenant.used, info.requestedCount)) + if tenant.used < uint64(info.RequestedCount) { + panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", + tenant.used, info.RequestedCount)) } - tenant.used -= uint64(info.requestedCount) + tenant.used -= uint64(info.RequestedCount) } // Else, we don't decrement tenant.used since we don't want to race with // the gc goroutine that will set used=0. q.mu.Unlock() - q.granter.returnGrant(info.requestedCount) + q.granter.returnGrant(info.RequestedCount) // The channel is sent to after releasing mu, so we don't need to hold // mu when receiving from it. Additionally, we've already called // returnGrant so we're not holding back future grant chains if this one @@ -745,8 +818,30 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { // releaseWaitingWork to return item to the waitingWorkPool. requestedCount := item.requestedCount q.mu.Unlock() - // Reduce critical section by sending on channel after releasing mutex. - item.ch <- grantChainID + + if !item.replicated.Enabled { + // Reduce critical section by sending on channel after releasing mutex. + item.ch <- grantChainID + } else { + // NB: We don't use grant chains for store tokens, so they don't apply + // to replicated writes. + + defer releaseWaitingWork(item) + q.onAdmittedReplicatedWork.admittedReplicatedWork( + roachpb.MustMakeTenantID(tenant.id), + item.priority, + item.replicated, + item.requestedCount, + item.createTime, + ) + + q.metrics.incAdmitted(item.priority) + waitDur := q.timeNow().Sub(item.enqueueingTime) + q.metrics.recordFinishWait(item.priority, waitDur) + if item.heapIndex != -1 { + panic(errors.AssertionFailedf("grantee should be removed from heap")) + } + } return requestedCount } @@ -769,24 +864,24 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { } // adjustTenantTokens is used internally by StoreWorkQueue. The -// additionalTokens count can be negative, in which case it is returning +// additionalTokensNeeded count can be negative, in which case it is returning // tokens. This is only for WorkQueue's own accounting -- it should not call // into granter. -func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokens int64) { +func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokensNeeded int64) { tid := tenantID.ToUint64() q.mu.Lock() defer q.mu.Unlock() tenant, ok := q.mu.tenants[tid] if ok { - if additionalTokens < 0 { - toReturn := uint64(-additionalTokens) + if additionalTokensNeeded < 0 { + toReturn := uint64(-additionalTokensNeeded) if tenant.used < toReturn { tenant.used = 0 } else { tenant.used -= toReturn } } else { - tenant.used += uint64(additionalTokens) + tenant.used += uint64(additionalTokensNeeded) } } } @@ -1243,6 +1338,7 @@ type waitingWork struct { // to false. inWaitingWorkHeap bool enqueueingTime time.Time + replicated ReplicatedWorkInfo } var waitingWorkPool = sync.Pool{ @@ -1387,11 +1483,29 @@ func (wwh *waitingWorkHeap) Less(i, j int) bool { if (*wwh)[i].arrivalTimeWorkOrdering == lifoWorkOrdering || (*wwh)[i].arrivalTimeWorkOrdering != (*wwh)[j].arrivalTimeWorkOrdering { // LIFO, and the epoch is closed, so can simply use createTime. - return (*wwh)[i].createTime > (*wwh)[j].createTime + + if (*wwh)[i].replicated.RangeID != (*wwh)[j].replicated.RangeID || + (!(*wwh)[i].replicated.Enabled || !(*wwh)[j].replicated.Enabled) { + // If rangeIDs are unequal, or we're not even using range IDs, + // sort by createTime in LIFO order. + return (*wwh)[i].createTime > (*wwh)[j].createTime + } + + // Ignore create time, so LIFO doesn't apply. Sort by log position. + return (*wwh)[i].replicated.LogPosition.Less((*wwh)[j].replicated.LogPosition) + } + + if (*wwh)[i].replicated.RangeID != (*wwh)[j].replicated.RangeID || + (!(*wwh)[i].replicated.Enabled || !(*wwh)[j].replicated.Enabled) { + // If rangeIDs are unequal, or we're not even using range IDs, + // sort by createTime in FIFO order. + return (*wwh)[i].createTime < (*wwh)[j].createTime } - // FIFO. - return (*wwh)[i].createTime < (*wwh)[j].createTime + + // Ignore create time, so FIFO doesn't apply. Sort by log position. + return (*wwh)[i].replicated.LogPosition.Less((*wwh)[j].replicated.LogPosition) } + return (*wwh)[i].priority > (*wwh)[j].priority } @@ -1636,68 +1750,97 @@ func makeWorkQueueMetricsSingle(name string) workQueueMetricsSingle { // seeking admission from a StoreWorkQueue. type StoreWriteWorkInfo struct { WorkInfo - // NB: no information about the size of the work is provided at admission - // time. The token subtraction at admission time is completely based on past - // estimates. This estimation is improved at work completion time via size - // information provided in StoreWorkDoneInfo. - // - // TODO(sumeer): in some cases, like AddSSTable requests, we do have size - // information at proposal time, and may be able to use it fruitfully. } // StoreWorkQueue is responsible for admission to a store. type StoreWorkQueue struct { - q [admissionpb.NumWorkClasses]WorkQueue + storeID roachpb.StoreID + q [admissionpb.NumWorkClasses]WorkQueue // Only calls storeWriteDone. The rest of the interface is used by // WorkQueue. granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone mu struct { syncutil.RWMutex + // estimates is used to determine how many tokens are deducted at-admit + // time for each request. It's not used for replication admission + // control (below-raft) where we do know the size of the write being + // admitted. estimates storeRequestEstimates - stats storeAdmissionStats + // stats are used to maintain L0 {write,ingest} linear models, modeling + // the relation between accounted for "physical" {write,ingest} bytes + // and observed L0 growth (which factors in state machine application). + stats storeAdmissionStats } + + knobs *TestingKnobs } // StoreWorkHandle is returned by StoreWorkQueue.Admit, and contains state -// needed by the caller (see StoreWorkHandle.AdmissionEnabled) and by +// needed by the caller (see StoreWorkHandle.UseAdmittedWorkDone) and by // StoreWorkQueue.AdmittedWorkDone. type StoreWorkHandle struct { tenantID roachpb.TenantID // The writeTokens acquired by this request. Must be > 0. - writeTokens int64 - workClass admissionpb.WorkClass - admissionEnabled bool + writeTokens int64 + workClass admissionpb.WorkClass + useAdmittedWorkDone bool } -// AdmissionEnabled indicates whether admission control is enabled. If it -// returns false, there is no need to call StoreWorkQueue.AdmittedWorkDone. -func (h StoreWorkHandle) AdmissionEnabled() bool { - return h.admissionEnabled +// UseAdmittedWorkDone indicates whether we need to invoke +// StoreWorkQueue.AdmittedWorkDone. It's false if AC is disabled or if we're +// using below-raft admission control. +func (h StoreWorkHandle) UseAdmittedWorkDone() bool { + return h.useAdmittedWorkDone } // Admit is called when requesting admission for store work. If err!=nil, the // request was not admitted, potentially due to a deadline being exceeded. If -// err=nil and handle.AdmissionEnabled() is true, AdmittedWorkDone must be +// err=nil and handle.UseAdmittedWorkDone() is true, AdmittedWorkDone must be // called when the admitted work is done. func (q *StoreWorkQueue) Admit( ctx context.Context, info StoreWriteWorkInfo, ) (handle StoreWorkHandle, err error) { - // For now, we compute a workClass based on priority. wc := admissionpb.WorkClassFromPri(info.Priority) - h := StoreWorkHandle{ - tenantID: info.TenantID, - workClass: wc, + if info.RequestedCount == 0 { + // We use a per-request estimate only when no requested count is + // provided. It's always provided for below-raft admission where we + // already know the size of the work being admitted. Since it's async, + // there's no token deduction upfront. Later when logically admitting, + // we're relying on there not have been upfront token deductions so we + // just deduct what was requested. + q.mu.RLock() + info.RequestedCount = q.mu.estimates.writeTokens + q.mu.RUnlock() } - q.mu.RLock() - estimates := q.mu.estimates - q.mu.RUnlock() - h.writeTokens = estimates.writeTokens - info.WorkInfo.requestedCount = h.writeTokens + enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { return StoreWorkHandle{}, err } - h.admissionEnabled = enabled + + h := StoreWorkHandle{ + tenantID: info.TenantID, + workClass: wc, + writeTokens: info.RequestedCount, + useAdmittedWorkDone: enabled, + } + if !info.ReplicatedWorkInfo.Enabled { + return h, nil + } + + h.useAdmittedWorkDone = false + var storeWorkDoneInfo StoreWorkDoneInfo + if info.ReplicatedWorkInfo.Ingested { + storeWorkDoneInfo.IngestedBytes = info.RequestedCount + } else { + storeWorkDoneInfo.WriteBytes = info.RequestedCount + } + + // Update store admission stats, because the write is happening ~this + // point. These statistics are used to maintain the underlying linear + // models (modeling relation between physical log writes and total L0 + // growth, which includes the state machine application). + q.updateStoreStatsAfterWorkDone(1, storeWorkDoneInfo, false) return h, nil } @@ -1713,13 +1856,75 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } -// AdmittedWorkDone indicates to the queue that the admitted work has -// completed. +type onAdmittedReplicatedWork interface { + admittedReplicatedWork( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + requestedTokens int64, + createTime int64, + ) +} + +var _ onAdmittedReplicatedWork = &StoreWorkQueue{} + +// admittedReplicatedWork indicates to the queue that replicated write work was +// admitted. +func (q *StoreWorkQueue) admittedReplicatedWork( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + requestedTokens int64, + createTime int64, +) { + if !rwi.Enabled { + panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") + } + if fn := q.knobs.AdmittedReplicatedWorkInterceptor; fn != nil { + fn(tenantID, pri, rwi, requestedTokens, createTime) + } + + wc := admissionpb.WorkClassFromPri(pri) + + var storeWorkDoneInfo StoreWorkDoneInfo + if rwi.Ingested { + storeWorkDoneInfo.IngestedBytes = requestedTokens + } else { + storeWorkDoneInfo.WriteBytes = requestedTokens + } + + // We use RequestedCount for replicated writes to deduct the right number of + // tokens (we know the size of the write already). Do the requisite token + // adjustments inline. + // + // TODO(irfansharif): Do we need to do this? We know the size of the write + // upfront, and deduct what should be the right number of tokens. So why the + // adjustment here? When deducting originally, how come we don't just apply + // the linear models? + additionalTokensNeeded := q.granters[wc].storeWriteDone(requestedTokens, storeWorkDoneInfo) + q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + + // TODO(irfansharif): Dispatch flow token returns here. We want to + // inform (a) the origin node of writes at (b) a given priority, to + // (c) the given range, at (d) the given log position on (e) the + // local store. Part of #95563. + // + _ = rwi.Origin // (a) + _ = pri // (b) + _ = rwi.RangeID // (c) + _ = rwi.LogPosition // (d) + _ = q.storeID // (e) +} + +// AdmittedWorkDone indicates to the queue that the admitted work has completed. +// It's used for the legacy above-raft admission control where we Admit() +// upfront, with just an estimate of the write size, and after the write is +// done, invoke AdmittedWorkDone with the now-known size. func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkDoneInfo) error { - if !h.admissionEnabled { - return nil + if !h.UseAdmittedWorkDone() { + return nil // nothing to do } - q.updateStoreAdmissionStats(1, doneInfo, false) + q.updateStoreStatsAfterWorkDone(1, doneInfo, false) additionalTokens := q.granters[h.workClass].storeWriteDone(h.writeTokens, doneInfo) q.q[h.workClass].adjustTenantTokens(h.tenantID, additionalTokens) return nil @@ -1729,7 +1934,7 @@ func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkD // can (a) adjust remaining tokens, (b) account for this in the per-work token // estimation model. func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDoneInfo) { - q.updateStoreAdmissionStats(uint64(workCount), doneInfo, true) + q.updateStoreStatsAfterWorkDone(uint64(workCount), doneInfo, true) // Since we have no control over such work, we choose to count it as // regularWorkClass. _ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo) @@ -1744,11 +1949,11 @@ func (q *StoreWorkQueue) StatsToIgnore(ingestStats pebble.IngestOperationStats) q.mu.Unlock() } -func (q *StoreWorkQueue) updateStoreAdmissionStats( +func (q *StoreWorkQueue) updateStoreStatsAfterWorkDone( workCount uint64, doneInfo StoreWorkDoneInfo, bypassed bool, ) { q.mu.Lock() - q.mu.stats.admittedCount += workCount + q.mu.stats.workCount += workCount q.mu.stats.writeAccountedBytes += uint64(doneInfo.WriteBytes) q.mu.stats.ingestedAccountedBytes += uint64(doneInfo.IngestedBytes) if bypassed { @@ -1795,16 +2000,24 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, + storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, + knobs *TestingKnobs, ) storeRequester { + if knobs == nil { + knobs = &TestingKnobs{} + } q := &StoreWorkQueue{ + storeID: storeID, granters: granters, + knobs: knobs, } for i := range q.q { initWorkQueue(&q.q[i], ambientCtx, KVWork, granters[i], settings, metrics, opts) + q.q[i].onAdmittedReplicatedWork = q } // Arbitrary initial value. This will be replaced before any meaningful // token constraints are enforced. diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 0296c442a7ad..f4902930be32 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -57,9 +57,12 @@ func (b *builderWithMu) stringAndReset() string { } type testGranter struct { - name string - buf *builderWithMu - r requester + gk grantKind + name string + buf *builderWithMu + r requester + + // Configurable knobs for tests. returnValueFromTryGet bool additionalTokens int64 } @@ -67,21 +70,26 @@ type testGranter struct { var _ granterWithStoreWriteDone = &testGranter{} func (tg *testGranter) grantKind() grantKind { - return slot + return tg.gk } + func (tg *testGranter) tryGet(count int64) bool { tg.buf.printf("tryGet%s: returning %t", tg.name, tg.returnValueFromTryGet) return tg.returnValueFromTryGet } + func (tg *testGranter) returnGrant(count int64) { tg.buf.printf("returnGrant%s %d", tg.name, count) } + func (tg *testGranter) tookWithoutPermission(count int64) { tg.buf.printf("tookWithoutPermission%s %d", tg.name, count) } + func (tg *testGranter) continueGrantChain(grantChainID grantChainID) { tg.buf.printf("continueGrantChain%s %d", tg.name, grantChainID) } + func (tg *testGranter) grant(grantChainID grantChainID) { rv := tg.r.granted(grantChainID) if rv > 0 { @@ -93,6 +101,7 @@ func (tg *testGranter) grant(grantChainID grantChainID) { } tg.buf.printf("granted%s: returned %d", tg.name, rv) } + func (tg *testGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { @@ -186,7 +195,7 @@ func TestWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg = &testGranter{buf: &buf} + tg = &testGranter{gk: slot, buf: &buf} opts := makeWorkQueueOptions(KVWork) timeSource = timeutil.NewManualTime(initialTime) opts.timeSource = timeSource @@ -329,7 +338,7 @@ func TestWorkQueueTokenResetRace(t *testing.T) { defer log.Scope(t).Close(t) var buf builderWithMu - tg := &testGranter{buf: &buf} + tg := &testGranter{gk: slot, buf: &buf} st := cluster.MakeTestingClusterSettings() registry := metric.NewRegistry() metrics := makeWorkQueueMetrics("", registry) @@ -506,16 +515,16 @@ func TestStoreWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg[admissionpb.RegularWorkClass] = &testGranter{name: " regular", buf: &buf} - tg[admissionpb.ElasticWorkClass] = &testGranter{name: " elastic", buf: &buf} + tg[admissionpb.RegularWorkClass] = &testGranter{gk: token, name: " regular", buf: &buf} + tg[admissionpb.ElasticWorkClass] = &testGranter{gk: token, name: " elastic", buf: &buf} opts := makeWorkQueueOptions(KVWork) opts.usesTokens = true opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() - q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), + q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts).(*StoreWorkQueue) + st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() diff --git a/pkg/util/timeutil/time.go b/pkg/util/timeutil/time.go index 811f63b72bff..42f37be43486 100644 --- a/pkg/util/timeutil/time.go +++ b/pkg/util/timeutil/time.go @@ -90,12 +90,20 @@ func Until(t time.Time) time.Duration { var UnixEpoch = time.Unix(0, 0).UTC() // FromUnixMicros returns the UTC time.Time corresponding to the given Unix -// time, usec microseconds since UnixEpoch. In Go's current time.Time +// time, usec microseconds since UnixEpoch. +// In Go's current time.Time // implementation, all possible values for us can be represented as a time.Time. func FromUnixMicros(us int64) time.Time { return time.Unix(us/1e6, (us%1e6)*1e3).UTC() } +// FromUnixNanos returns the UTC time.Time corresponding to the given Unix +// time, ns nanoseconds since UnixEpoch. In Go's current time.Time +// implementation, all possible values for ns can be represented as a time.Time. +func FromUnixNanos(ns int64) time.Time { + return time.Unix(ns/1e9, ns%1e9).UTC() +} + // ToUnixMicros returns t as the number of microseconds elapsed since UnixEpoch. // Fractional microseconds are rounded, half up, using time.Round. Similar to // time.Time.UnixNano, the result is undefined if the Unix time in microseconds