From a8747c9120bf2729e1cfd474ec83ac565b0da9da Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 17 Feb 2023 03:23:24 +0400 Subject: [PATCH] admission: support non-blocking {Store,}WorkQueue.Admit() Part of #95563. For end-to-end flow control of replicated writes, we want to enable below-raft admission control through the following API on kvadmission.Controller: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage (for the given tenant, the specific range, and // on the named store). AdmitRaftEntry( context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry. ) This serves as the integration point for log entries received below raft right as they're being written to stable storage. It's a non-blocking interface since we're below-raft and in the raft.Ready() loop. What it effectively does is enqueues a "virtual" work item in the underlying StoreWorkQueue mediating all store IO. This virtual work item is what later gets dequeued once the IO granter informs the work queue of newly available IO tokens. When enqueueing the virtual work item, we still update the StoreWorkQueue's physically-accounted-for bytes since the actual write is not deferred, and timely statistic updates improves accuracy for the underlying linear models that map between accounted-for writes and observed L0 growth (using it to inform IO token generation rates). For each of the arguments above: - The roachpb.TenantID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation). - The roachpb.StoreID to find the right store work queue on multi-store nodes. We'll also use the StoreID when informing the origin node of log entries being admitted[^1]. - We pass in the roachpb.RangeID on behalf of which work is being admitted. This, along side the raftpb.Entry.{Term,Index} for the replicated write uniquely identifies where the write is to end up. We use these identifiers to return flow tokens on the origin node[^1][^2]. - For standard work queue ordering, our work item needs to include the CreateTime and AdmissionPriority, details that are passed down using dedicated raft log entry encodings[^3][^4] as part of the raftpb.Entry parameter above. - Since the raftpb.Entry encodes within it its origin node[^4], it will be used post-admission to dispatch flow tokens to the right node. This integration is left to future PRs. We use the above to populate the following fields on a per-(replicated write)work basis: // ReplicatedWorkInfo groups everything needed to admit replicated // writes, done so asynchronously below-raft as part of replication // admission control. type ReplicatedWorkInfo struct { RangeID roachpb.RangeID Origin roachpb.NodeID LogPosition LogPosition Ingested bool } Since admission is happening below-raft where the size of the write is known, we no longer need per-work estimates for upfront IO token deductions. Since admission is asynchronous, we also don't use the AdmittedWorkDone interface which was to make token adjustments (without blocking) given the upfront estimates. We still want to intercept the exact point when some write work gets admitted in order to inform the origin node so it can release flow tokens. We do so through the following interface satisfied by the StoreWorkQueue: // onAdmittedReplicatedWork is used to intercept the // point-of-admission for replicated writes. type onAdmittedReplicatedWork interface { admittedReplicatedWork( tenantID roachpb.TenantID, pri admissionpb.WorkPriority, rwi ReplicatedWorkInfo, requestedTokens int64, ) } [^1]: See kvflowcontrolpb.AdmittedRaftLogEntries introduced in #95637. [^2]: See kvflowcontrol.Handle.{ReturnTokensUpto,DeductTokensFor} introduced in #95637. Token deductions and returns are tied to raft log positions. [^3]: See raftlog.EntryEncoding{Standard,Sideloaded}WithAC introduced in #95748. [^4]: See kvflowcontrolpb.RaftAdmissionMeta introduced in #95637. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } Release note: None --- pkg/BUILD.bazel | 4 + pkg/kv/kvserver/kvadmission/BUILD.bazel | 1 + pkg/kv/kvserver/kvadmission/kvadmission.go | 63 ++- pkg/kv/kvserver/kvflowcontrol/doc.go | 113 +++++ .../kvserver/kvflowcontrol/kvflowcontrol.go | 14 +- .../kvflowcontrol/kvflowhandle/BUILD.bazel | 2 + .../kvflowhandle/kvflowhandle.go | 23 +- .../kvflowhandle/kvflowhandle_test.go | 67 ++- .../kvflowcontrol/kvflowsequencer/BUILD.bazel | 32 ++ .../kvflowsequencer/sequencer.go | 55 +++ .../kvflowsequencer/sequencer_test.go | 89 ++++ .../kvflowsequencer/testdata/sequencer | 104 ++++ .../kvflowsimulator/simulation_test.go | 2 +- pkg/server/node.go | 2 +- pkg/util/admission/BUILD.bazel | 3 + pkg/util/admission/admission.go | 7 +- pkg/util/admission/elastic_cpu_work_queue.go | 2 +- .../admission/elastic_cpu_work_queue_test.go | 2 +- pkg/util/admission/grant_coordinator.go | 11 +- pkg/util/admission/granter.go | 2 +- pkg/util/admission/granter_test.go | 11 +- pkg/util/admission/io_load_listener.go | 3 +- pkg/util/admission/io_load_listener_test.go | 6 +- .../replicated_write_admission_test.go | 447 ++++++++++++++++++ pkg/util/admission/store_token_estimation.go | 4 +- .../admission/store_token_estimation_test.go | 2 +- pkg/util/admission/testdata/io_load_listener | 18 +- .../class_segmentation | 63 +++ ...h_create_time_low_position_different_range | 53 +++ .../high_pri_low_position | 51 ++ .../replicated_write_admission/overview | 70 +++ .../tenant_fairness | 70 +++ .../replicated_write_admission/tenant_weights | 107 +++++ pkg/util/admission/testdata/store_work_queue | 40 +- pkg/util/admission/testing_knobs.go | 36 ++ pkg/util/admission/work_queue.go | 421 +++++++++++++---- pkg/util/admission/work_queue_test.go | 29 +- pkg/util/timeutil/time.go | 7 + 38 files changed, 1861 insertions(+), 175 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer create mode 100644 pkg/util/admission/replicated_write_admission_test.go create mode 100644 pkg/util/admission/testdata/replicated_write_admission/class_segmentation create mode 100644 pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range create mode 100644 pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position create mode 100644 pkg/util/admission/testdata/replicated_write_admission/overview create mode 100644 pkg/util/admission/testdata/replicated_write_admission/tenant_fairness create mode 100644 pkg/util/admission/testdata/replicated_write_admission/tenant_weights create mode 100644 pkg/util/admission/testing_knobs.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 78f4a514ebec..ea1759dd0370 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -217,6 +217,7 @@ ALL_TESTS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", + "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", "//pkg/kv/kvserver/kvstorage:kvstorage_test", @@ -1264,6 +1265,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:kvflowdispatch_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test", + "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer", + "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:kvflowsequencer_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test", @@ -2688,6 +2691,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 189dfd08f6cf..c0446fd040ed 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -8,6 +8,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvpb", + "//pkg/kv/kvserver/raftlog", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index d67e6b7d509a..def2d71c9507 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -120,7 +121,7 @@ type Controller interface { FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) // AdmitRaftEntry informs admission control of a raft log entry being // written to storage. - AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) + AdmitRaftEntry(context.Context, roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -244,7 +245,7 @@ func (n *controllerImpl) AdmitKVWork( if err != nil { return Handle{}, err } - admissionEnabled = storeWorkHandle.AdmissionEnabled() + admissionEnabled = storeWorkHandle.UseAdmittedWorkDone() if admissionEnabled { defer func() { if retErr != nil { @@ -409,9 +410,63 @@ func (n *controllerImpl) FollowerStoreWriteBytes( // AdmitRaftEntry implements the Controller interface. func (n *controllerImpl) AdmitRaftEntry( - roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry, + ctx context.Context, + tenantID roachpb.TenantID, + storeID roachpb.StoreID, + rangeID roachpb.RangeID, + entry raftpb.Entry, ) { - panic("unimplemented") + typ, err := raftlog.EncodingOf(entry) + if err != nil { + log.Errorf(ctx, "unable to determine raft command encoding: %v", err) + return + } + if !typ.UsesAdmissionControl() { + return // nothing to do + } + meta, err := raftlog.DecodeRaftAdmissionMeta(entry.Data) + if err != nil { + log.Errorf(ctx, "unable to decode raft command admission data: %v", err) + return + } + + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + log.Errorf(ctx, "unable to find queue for store: %s", storeID) + return // nothing to do + } + + if len(entry.Data) == 0 { + log.Fatal(ctx, "found (unexpected) empty raft command for below-raft admission") + } + wi := admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.WorkPriority(meta.AdmissionPriority), + CreateTime: meta.AdmissionCreateTime, + BypassAdmission: false, + RequestedCount: int64(len(entry.Data)), + } + wi.ReplicatedWorkInfo = admission.ReplicatedWorkInfo{ + Enabled: true, + RangeID: rangeID, + Origin: meta.AdmissionOriginNode, + LogPosition: admission.LogPosition{ + Term: entry.Term, + Index: entry.Index, + }, + Ingested: typ.IsSideloaded(), + } + + handle, err := storeAdmissionQ.Admit(ctx, admission.StoreWriteWorkInfo{ + WorkInfo: wi, + }) + if err != nil { + log.Errorf(ctx, "error while admitting to store admission queue: %v", err) + return + } + if handle.UseAdmittedWorkDone() { + log.Fatalf(ctx, "unexpected handle.UseAdmittedWorkDone") + } } // FollowerStoreWriteBytes captures stats about writes done to a store by a diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index 60abec7421fe..ec784c1e1cd7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -327,6 +327,105 @@ package kvflowcontrol // it can transition into the mode described in I3a where we deduct/block for // flow tokens for subsequent quorum writes. // +// I12. How does this interact with epoch-LIFO? Or CreateTime ordering +// generally? +// - Background: Epoch-LIFO tries to reduce lower percentile admission queueing +// delay (at the expense of higher percentile delay) by switching between the +// standard CreateTime-based FIFO ordering to LIFO-within-an-epoch. Work +// submitted by a transaction with CreateTime T is slotted into the epoch +// number with time interval [E, E+100ms) where T is contained in this +// interval. The work will start executing after the epoch is closed, at +// ~E+100ms. This increases transaction latency by at most ~100ms. When the +// epoch closes, all nodes start executing the transactions in that epoch in +// LIFO order, and will implicitly have the same set of competing +// transactions[^10], a set that stays unchanged until the next epoch closes. +// And by the time the next epoch closes, and the current epoch's transactions +// are deprioritized, 100ms will have elapsed, which is selected to be long +// enough for most of these now-admitted to have finished their work. +// - We switch to LIFO-within-an-epoch once we start observing that the +// maximum queuing delay for work within a starts +// exceeding the ~100ms we'd add with epoch-LIFO. +// - The idea is we have bottleneck resources that cause delays without +// bound with if work keeps accumulating, and other kinds of bottlenecks +// where delays aren't increasing without bound. We're also relying on work +// bottlenecked on epoch-LIFO not being able to issue more work. +// - For below-raft work queue ordering, we effectively ignore the "true" +// CreateTime when ordering work. Within a given , +// we instead want admission takes place in raft log order (i.e. entries with +// lower terms to get admitted first, or lower indexes within the same term). +// This lets us simplifies token returns which happen by specifying a prefix +// up to which we want to release flow tokens for a given priority[^11]. +// - NB: Regarding "admission takes place in raft log order", we could +// implement this differently. We introduced log-position based ordering to +// simplify the implementation of token returns where we release tokens by +// specifying the log position up-to-which we want to release held +// tokens[^12]. But with additional tracking in the below-raft work queues, +// if we know that work W2 with log position L2 got admitted, and +// corresponded to F flow token deductions at the origin, and we also know +// that work W1 with log position L1 is currently queued, also corresponding +// to F flow token deductions at the origin, we could inform the origin node +// to return flow tokens up to L1 and still get what we want -- a return of +// F flow tokens when each work gets admitted. +// - We effectively ignore "true" CreateTime since flow token deductions at +// the sender aren't tied to CreateTime in the way they're tied to the +// issuing tenant or the work class. So while we still want priority-based +// ordering to release regular flow tokens before elastic ones, releasing +// flow tokens for work with lower CreateTimes does not actually promote +// doing older work first since the physical work below-raft is already done +// before (asynchronous) admission, and the token returns don't unblock work +// from some given epoch. Below-raft ordering by "true" CreateTime is moot. +// - Note that for WorkQueue orderings, we have (i) fair sharing through +// tenantID+weight, (ii) strict prioritization, (iii) and sequencing of work +// within a , using CreateTime. For below-raft work +// queue ordering where we want to admit in roughly log position order, we +// then (ab)use the CreateTime sequencing by combining each work's true +// CreateTime with its log position[^13], to get a monotonic "sequencing +// timestamp" that tracks observed log positions. This sequencing timestamp is +// kept close to the maximum observed CreateTime within a replication stream, +// which also lets us generate cluster-wide FIFO ordering as follows. +// - We re-assign CreateTime in a way that, with a high probability, matches +// log position order. We can be imprecise/forgetful about this tracking +// since at worst we might over-admit slightly. +// - To operate within cluster-wide FIFO ordering, we want to order by +// "true" CreateTime when comparing work across different ranges. Writes for +// a single range, as observed by a given store below-raft (follower or +// otherwise) travel along a single stream. Consider the case where a single +// store S3 receives replication traffic for two ranges R1 and R2, +// originating from two separate nodes N1 and N2. If N1 is issuing writes +// with strictly older CreateTimes, when returning flow tokens we should +// prefer N1. By keeping these sequence numbers close to "true" CreateTimes, +// we'd be favoring N1 without introducing bias for replication streams with +// shorter/longer raft logs[^12][^14]. +// - We could improve cluster-wide FIFO properties by introducing a +// WorkQueue-like data structure that simply orders by "true" CreateTime when +// acquiring flow tokens above raft. +// - Could we then use this above-raft ordering to implement epoch-LIFO? +// Cluster-wide we want to admit work within a given epoch, which here +// entails issuing replication traffic for work slotted in a given epoch. +// - Fan-in effects to consider for epoch-LIFO, assuming below-raft orderings +// are as described above (i.e. log-position based). +// - When there's no epoch-LIFO happening, we have cluster-wide FIFO +// ordering within a pair as described above. +// - When epoch-LIFO is happening across all senders issuing replication +// traffic to a given receiver store, we're seeing work within the same +// epoch, but we'll be returning flow tokens to nodes issuing work with +// older CreateTimes. So defeating the LIFO in epoch-LIFO, but we're still +// completing work slotted into some given epoch. +// - When epoch-LIFO is happening only on a subset of the senders issuing +// replication traffic, we'll again be returning flow tokens to nodes +// issuing work with older CreateTimes. This is undefined. +// - It's strange that different nodes can admit work from "different +// epochs"[^10]. What are we to do below-raft, when deciding where to +// return flow tokens back to, since it's all for the same +// ? Maybe we need to pass down whether the work was +// admitted at the sender with LIFO/FIFO, and return flow tokens in +// {LIFO,FIFO} order across all nodes that issued {LIFO,FIFO} work? Or +// also pass down the enqueuing timestamp, so we have a good sense +// below-raft on whether this work is past the epoch expiration and +// should be deprioritized. +// - Because the fan-in effects of epoch-LIFO are not well understood (by this +// author at least), we just disable it below-raft. +// // --- // // [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent @@ -373,6 +472,20 @@ package kvflowcontrol // machine application get significantly behind due to local scheduling // reasons by using the same goroutine to do both async raft log writes // and state machine application. +// [^10]: This relies on partially synchronized clocks without requiring +// explicit coordination. +// - Isn't the decision to start using epoch-LIFO a node-local one, based +// on node-local max queuing latency for a ? So what +// happens when work for a transaction is operating across multiple +// nodes, some using epoch-LIFO and some not? +// Is this why we use the max queuing latency as the trigger to switch +// into epoch-LIFO? All queued work for an epoch E across all nodes, if +// still queued after ~100ms, will trigger epoch-LIFO everywhere. +// [^11]: See the implementation for kvflowcontrol.Dispatch. +// [^12]: See UpToRaftLogPosition in AdmittedRaftLogEntries. +// [^13]: See kvflowsequencer.Sequencer and its use in kvflowhandle.Handle. +// [^14]: See the high_create_time_low_position_different_range test case for +// TestReplicatedWriteAdmission. // // TODO(irfansharif): These descriptions are too high-level, imprecise and // possibly wrong. Fix that. After implementing these interfaces and integrating diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 3f0a51ef3831..e55cb2acc4c7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -98,8 +98,13 @@ type Handle interface { // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it // to end up in, log positions that monotonically increase. Requests are - // assumed to have been Admit()-ed first. - DeductTokensFor(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens) + // assumed to have been Admit()-ed first. The returned time.Time parameter + // is to be used as the work item's CreateTime when enqueueing in IO + // admission queues. + DeductTokensFor( + context.Context, admissionpb.WorkPriority, time.Time, + kvflowcontrolpb.RaftLogPosition, Tokens, + ) time.Time // ReturnTokensUpto returns all previously deducted tokens of a given // priority for all log positions less than or equal to the one specified. // It does for the specific stream. Once returned, subsequent attempts to @@ -110,7 +115,10 @@ type Handle interface { // the same raft term -- we want to both free up tokens from when we lost // the lease, and also ensure we discard attempts to return them (on hearing // about AdmittedRaftLogEntries replicated under the earlier lease). - ReturnTokensUpto(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream) + ReturnTokensUpto( + context.Context, admissionpb.WorkPriority, + kvflowcontrolpb.RaftLogPosition, Stream, + ) // ConnectStream connects a stream (typically pointing to an active member // of the raft group) to the handle. Subsequent calls to Admit() will block // until flow tokens are available for the stream, or for it to be diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel index 0b4379c2a72b..ff07b3cc5958 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", @@ -39,6 +40,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", + "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 0ff5ebf18a10..1c43ffeb2ab7 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -40,6 +41,7 @@ type Handle struct { // (identified by their log positions) have been admitted below-raft, // streams disconnect, or the handle closed entirely. perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker + sequencer *kvflowsequencer.Sequencer closed bool } } @@ -52,6 +54,7 @@ func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock clock: clock, } h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{} + h.mu.sequencer = kvflowsequencer.New() return h } @@ -101,28 +104,31 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim func (h *Handle) DeductTokensFor( ctx context.Context, pri admissionpb.WorkPriority, + ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) { +) time.Time { if h == nil { // TODO(irfansharif): See TODO around nil receiver check in Admit(). - return + return ct } - _ = h.deductTokensForInner(ctx, pri, pos, tokens) + ct, _ = h.deductTokensForInner(ctx, pri, ct, pos, tokens) + return ct } func (h *Handle) deductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, + ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) (streams []kvflowcontrol.Stream) { +) (sequence time.Time, streams []kvflowcontrol.Stream) { h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { log.Errorf(ctx, "operating on a closed handle") - return nil // unused return value in production code + return ct, nil // unused return value in production code } for _, c := range h.mu.connections { @@ -130,7 +136,7 @@ func (h *Handle) deductTokensForInner( h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) streams = append(streams, c.Stream()) } - return streams + return h.mu.sequencer.Sequence(ct), streams } // ReturnTokensUpto is part of the kvflowcontrol.Handle interface. @@ -316,8 +322,9 @@ func (h *Handle) TestingNonBlockingAdmit( func (h *Handle) TestingDeductTokensForInner( ctx context.Context, pri admissionpb.WorkPriority, + ct time.Time, pos kvflowcontrolpb.RaftLogPosition, tokens kvflowcontrol.Tokens, -) []kvflowcontrol.Stream { - return h.deductTokensForInner(ctx, pri, pos, tokens) +) (time.Time, []kvflowcontrol.Stream) { + return h.deductTokensForInner(ctx, pri, ct, pos, tokens) } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index a4f1182ac19f..72f5d7e05724 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -79,7 +80,7 @@ func TestHandleAdmit(t *testing.T) { // Connect a single stream at pos=0 and deplete all 16MiB of regular // tokens at pos=1. handle.ConnectStream(ctx, pos(0), stream) - handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + handle.DeductTokensFor(ctx, admissionpb.NormalPri, time.Time{}, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) // Invoke .Admit() in a separate goroutine, and test below whether // the goroutine is blocked. @@ -105,3 +106,67 @@ func TestHandleAdmit(t *testing.T) { }) } } + +// TestHandleSequencing tests the sequencing behavior of +// Handle.DeductTokensFor(), namely that we: +// - advance sequencing timestamps when the create-time advances; +// - advance sequencing timestamps when the log position advances. +func TestHandleSequencing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // tzero represents the t=0, the earliest possible time. All other + // create-time= is relative to this time. + var tzero = timeutil.Unix(0, 0) + + ctx := context.Background() + stream := kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(42), + StoreID: roachpb.StoreID(42), + } + pos := func(t, i uint64) kvflowcontrolpb.RaftLogPosition { + return kvflowcontrolpb.RaftLogPosition{Term: t, Index: i} + } + ct := func(d int64) time.Time { + return tzero.Add(time.Nanosecond * time.Duration(d)) + } + + const tokens = kvflowcontrol.Tokens(1 << 20 /* MiB */) + const normal = admissionpb.NormalPri + + registry := metric.NewRegistry() + clock := hlc.NewClockForTesting(nil) + controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock) + handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock) + + // Test setup: handle is connected to a single stream at pos=1/0 and has + // deducted 1MiB of regular tokens at pos=1 ct=1. + handle.ConnectStream(ctx, pos(1, 0), stream) + seq0 := handle.DeductTokensFor(ctx, normal, ct(1), pos(1, 1), tokens) + + // If create-time advances, so does the sequencing timestamp. + seq1 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) + require.Greater(t, seq1, seq0) + + // If stays static, the sequencing timestamp + // still advances. + seq2 := handle.DeductTokensFor(ctx, normal, ct(2), pos(1, 1), tokens) + require.Greater(t, seq2, seq1) + + // If the log index advances, so does the sequencing timestamp. + seq3 := handle.DeductTokensFor(ctx, normal, ct(3), pos(1, 2), tokens) + require.Greater(t, seq3, seq2) + + // If the log term advances, so does the sequencing timestamp. + seq4 := handle.DeductTokensFor(ctx, normal, ct(3), pos(2, 2), tokens) + require.Greater(t, seq4, seq3) + + // If both the create-time and log-position advance, so does the sequencing + // timestamp. + seq5 := handle.DeductTokensFor(ctx, normal, ct(1000), pos(4, 20), tokens) + require.Greater(t, seq5, seq4) + + // Verify that the sequencing timestamp is kept close to the maximum + // observed create-time. + require.LessOrEqual(t, seq5.Sub(ct(1000)), time.Nanosecond) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel new file mode 100644 index 000000000000..fb93bbde5d4e --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/BUILD.bazel @@ -0,0 +1,32 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "kvflowsequencer", + srcs = ["sequencer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "kvflowsequencer_test", + srcs = ["sequencer_test.go"], + args = ["-test.timeout=295s"], + data = glob(["testdata/**"]), + embed = [":kvflowsequencer"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/testutils/datapathutils", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/timeutil", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go new file mode 100644 index 000000000000..9cc0271f4257 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer.go @@ -0,0 +1,55 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowsequencer + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// Sequencer issues monotonic sequencing timestamps derived from observed +// CreateTimes. This is a purpose-built data structure for replication admission +// control where we want to assign each AC-queued work below-raft a "sequence +// number" for FIFO ordering within a . We ensure timestamps +// are roughly monotonic with respect to log positions of replicated work[1] by +// sequencing work in log position order at the caller[2]. We also want it to +// track actual CreateTimes (similar to an HLC) for the global-FIFO ordering +// reasons explained in [1]. +// +// It's not safe for concurrent access. +// +// [1]: See I12 from kvflowcontrol/doc.go. +// [2]: See kvflowhandle.Handle. +type Sequencer struct { + // maxCreateTime ratchets to the highest observed CreateTime. If sequencing + // work with lower CreateTimes, we continue generating monotonic sequence + // numbers by incrementing it for every such sequencing attempt. Provided + // work is sequenced in log position order, the sequencing timestamps + // generated are also roughly monotonic with respect to log positions. + maxCreateTime int64 +} + +// New returns a new Sequencer. +func New() *Sequencer { + return &Sequencer{} +} + +// Sequence returns a monotonically increasing timestamps derived from the +// provided CreateTime. +func (s *Sequencer) Sequence(ct time.Time) time.Time { + createTime := ct.UnixNano() + if createTime <= s.maxCreateTime { + createTime = s.maxCreateTime + 1 + } + s.maxCreateTime = createTime + return timeutil.FromUnixNanos(createTime) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go new file mode 100644 index 000000000000..cc97d1a86047 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/sequencer_test.go @@ -0,0 +1,89 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowsequencer + +import ( + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestSequencer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + var sequencer *Sequencer + var lastSeqNum int64 + datadriven.RunTest(t, datapathutils.TestDataPath(t, "sequencer"), + func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + sequencer = New() + return "" + + case "sequence": + var arg, movement string + + // Parse create-time=. + d.ScanArgs(t, "create-time", &arg) + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + + // Parse log-position=/. + logPosition := parseLogPosition(t, d) + _ = logPosition + sequenceNum := sequencer.Sequence(tzero.Add(dur)).UnixNano() + if lastSeqNum < sequenceNum { + movement = " (advanced)" + } + lastSeqNum = sequenceNum + return fmt.Sprintf("seq=%d ≈%s%s", + sequenceNum, + timeutil.FromUnixNanos(sequenceNum).Sub(tzero), + movement, + ) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }, + ) +} + +// tzero represents the t=0, the earliest possible time. All other +// create-time= is relative to this time. +var tzero = timeutil.Unix(0, 0) + +func parseLogPosition(t *testing.T, d *datadriven.TestData) kvflowcontrolpb.RaftLogPosition { + // Parse log-position=/. + var arg string + d.ScanArgs(t, "log-position", &arg) + inner := strings.Split(arg, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return kvflowcontrolpb.RaftLogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer new file mode 100644 index 000000000000..ea335538f940 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsequencer/testdata/sequencer @@ -0,0 +1,104 @@ +# Walk through the basics of how the per-handle sequencer works. The +# log-position= parameter is not actually used in the implementation, but in +# typical usage we'd be sequencing work in log position order, and it's +# instructive to understand how sequencing timestamps are generated. +# +# ----------------------------------------------------------------------------- +# 1. Observe how the sequence numbers change with changing log positions (and +# static create-times). +init +---- + +sequence create-time=0ms log-position=1/0 +---- +seq=1 ≈1ns (advanced) + +# If the log index is incremented, so does the sequence number. So we're +# issuing "create times" that are higher than ones issued before for lower log +# indexes. +sequence create-time=0ms log-position=1/1 +---- +seq=2 ≈2ns (advanced) + +# Bump the log index by 19, and observe a higher sequence number. Since the +# create-time is static (0ms now and earlier), and we try to stay as close to +# the max observed create-time as possible, we increment the sequence number by +# the smallest possible amount -- 1ns. +sequence create-time=0ms log-position=1/20 +---- +seq=3 ≈3ns (advanced) + +# Regressions in log indexes (indicating buggy usage) doesn't cause regressions +# in the sequence number. As before, we simply issue monotonically increasing +# timestamps. With such buggy use, at worst we'd be over-admitting work when +# releasing tokens for higher log positions but lower sequencing timestamps. +sequence create-time=0ms log-position=1/10 +---- +seq=4 ≈4ns (advanced) + +# Try again with a large bump in the log index. +sequence create-time=0ms log-position=1/500 +---- +seq=5 ≈5ns (advanced) + +# Increases in the raft term also increases the sequence number. +sequence create-time=0ms log-position=2/0 +---- +seq=6 ≈6ns (advanced) + +# Regressions in the raft term (indicating buggy usage) doesn't cause +# regressions in the sequence number. This is the same as the buggy use case +# above. +sequence create-time=0ms log-position=1/0 +---- +seq=7 ≈7ns (advanced) + +# Try another (large) bump in the raft term, observing a delta in the sequence +# number. +sequence create-time=0ms log-position=5/0 +---- +seq=8 ≈8ns (advanced) + + +# ----------------------------------------------------------------------------- +# 2. Observe how the sequence numbers change with changing create-times and/or +# log positions. +init +---- + +sequence create-time=0ms log-position=1/1 +---- +seq=1 ≈1ns + +# Create time advancing to 1us also advances the sequence numbers -- they're +# kept closely tied to the largest observed create time. +sequence create-time=1us log-position=1/1 +---- +seq=1000 ≈1µs (advanced) + +# Ditto for subsequent increases in max observed create times. +sequence create-time=2us log-position=1/1 +---- +seq=2000 ≈2µs (advanced) + +# Regressions in create-time don't cause regressions in the sequence number. +# We'll still produce sequencing timestamps that monotonically increase and +# close to the maximum observed create-time. +sequence create-time=1us log-position=1/1 +---- +seq=2001 ≈2.001µs (advanced) + + +# Advance both the create-time and log position. We should see the sequence +# number ratchet up accordingly. +sequence create-time=3us log-position=1/2 +---- +seq=3000 ≈3µs (advanced) + +# Advance just the log position. We should see the sequence number ratchet up +# accordingly. +sequence create-time=3us log-position=1/3 +---- +seq=3001 ≈3.001µs (advanced) + +# vim:ft=sh diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go index d898c39ce43a..6cdc943cfc32 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go @@ -630,7 +630,7 @@ func (h *replicaHandle) deductTokens( // Increment the quorum log position -- all token deductions are bound to // incrementing log positions. h.quorumLogPosition.Index += 1 - streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens) + _, streams := h.handle.TestingDeductTokensForInner(ctx, pri, time.Time{}, h.quorumLogPosition, tokens) for _, stream := range streams { h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) } diff --git a/pkg/server/node.go b/pkg/server/node.go index f0df2e141f5c..2c05eed96d0c 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -919,7 +919,7 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { diskStats = s } metrics = append(metrics, admission.StoreMetrics{ - StoreID: int32(store.StoreID()), + StoreID: store.StoreID(), Metrics: m.Metrics, WriteStallCount: m.WriteStallCount, DiskStats: diskStats}) diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index b288a3fe3b14..3ed968ba72fa 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "scheduler_latency_listener.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", + "testing_knobs.go", "tokens_linear_model.go", "work_queue.go", ], @@ -53,6 +54,7 @@ go_test( "elastic_cpu_work_queue_test.go", "granter_test.go", "io_load_listener_test.go", + "replicated_write_admission_test.go", "scheduler_latency_listener_test.go", "store_token_estimation_test.go", "tokens_linear_model_test.go", @@ -67,6 +69,7 @@ go_test( "//pkg/testutils/datapathutils", "//pkg/testutils/echotest", "//pkg/util/admission/admissionpb", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index ccbf4a2de2bc..e59d16927952 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -522,8 +522,9 @@ func workKindString(workKind WorkKind) string { // all of these when StoreWorkQueue.AdmittedWorkDone is called, so that these // cumulative values are mutually consistent. type storeAdmissionStats struct { - // Total requests that called AdmittedWorkDone or BypassedWorkDone. - admittedCount uint64 + // Total requests that called {Admitted,Bypassed}WorkDone, or in the case of + // replicated writes, the requests that called Admit. + workCount uint64 // Sum of StoreWorkDoneInfo.WriteBytes. // // TODO(sumeer): writeAccountedBytes and ingestedAccountedBytes are not @@ -548,7 +549,7 @@ type storeAdmissionStats struct { // (e.g. for logging). aux struct { // These bypassed numbers are already included in the corresponding - // {admittedCount, writeAccountedBytes, ingestedAccountedBytes}. + // {workCount, writeAccountedBytes, ingestedAccountedBytes}. bypassedCount uint64 writeBypassedAccountedBytes uint64 ingestedBypassedAccountedBytes uint64 diff --git a/pkg/util/admission/elastic_cpu_work_queue.go b/pkg/util/admission/elastic_cpu_work_queue.go index 04e05f2a5158..3efcc20c178a 100644 --- a/pkg/util/admission/elastic_cpu_work_queue.go +++ b/pkg/util/admission/elastic_cpu_work_queue.go @@ -81,7 +81,7 @@ func (e *ElasticCPUWorkQueue) Admit( if duration > MaxElasticCPUDuration { duration = MaxElasticCPUDuration } - info.requestedCount = duration.Nanoseconds() + info.RequestedCount = duration.Nanoseconds() enabled, err := e.workQueue.Admit(ctx, info) if err != nil { return nil, err diff --git a/pkg/util/admission/elastic_cpu_work_queue_test.go b/pkg/util/admission/elastic_cpu_work_queue_test.go index f044f984aeb1..291791b43c13 100644 --- a/pkg/util/admission/elastic_cpu_work_queue_test.go +++ b/pkg/util/admission/elastic_cpu_work_queue_test.go @@ -161,7 +161,7 @@ func (t *testElasticCPUInternalWorkQueue) Admit( _ context.Context, info WorkInfo, ) (enabled bool, err error) { if !t.disabled { - t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.requestedCount))) + t.buf.WriteString(fmt.Sprintf("admitted=%s ", time.Duration(info.RequestedCount))) } return !t.disabled, nil } diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 1a7a2005e60c..edf2b24b28e9 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -113,7 +113,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( if unsafeGc, ok := sgc.gcMap.Load(int64(m.StoreID)); ok { gc := (*GrantCoordinator)(unsafeGc) gc.pebbleMetricsTick(ctx, m) - iotc.UpdateIOThreshold(roachpb.StoreID(m.StoreID), gc.ioLoadListener.ioThreshold) + iotc.UpdateIOThreshold(m.StoreID, gc.ioLoadListener.ioThreshold) } else { log.Warningf(ctx, "seeing metrics for unknown storeID %d", m.StoreID) @@ -135,7 +135,7 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider( }() } -func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoordinator { +func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID roachpb.StoreID) *GrantCoordinator { coord := &GrantCoordinator{ settings: sgc.settings, useGrantChains: false, @@ -168,7 +168,7 @@ func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoo }, } - storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, granters, sgc.settings, sgc.workQueueMetrics, opts) + storeReq := sgc.makeStoreRequesterFunc(sgc.ambientCtx, storeID, granters, sgc.settings, sgc.workQueueMetrics, opts, nil) coord.queues[KVWork] = storeReq requesters := storeReq.getRequesters() kvg.regularRequester = requesters[admissionpb.RegularWorkClass] @@ -336,8 +336,9 @@ type makeRequesterFunc func( metrics *WorkQueueMetrics, opts workQueueOptions) requester type makeStoreRequesterFunc func( - _ log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester + _ log.AmbientContext, storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, +) storeRequester // NewGrantCoordinators constructs GrantCoordinators and WorkQueues for a // regular cluster node. Caller is responsible for: diff --git a/pkg/util/admission/granter.go b/pkg/util/admission/granter.go index a14a2fb9e63d..8787020d0886 100644 --- a/pkg/util/admission/granter.go +++ b/pkg/util/admission/granter.go @@ -594,7 +594,7 @@ type IOThresholdConsumer interface { // StoreMetrics are the metrics for a store. type StoreMetrics struct { - StoreID int32 + StoreID roachpb.StoreID *pebble.Metrics WriteStallCount int64 // Optional. diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index e5635fa2c3f1..c085950b12ce 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -109,8 +109,9 @@ func TestGranterBasic(t *testing.T) { storeCoordinators := &StoreGrantCoordinators{ settings: settings, makeStoreRequesterFunc: func( - ambientCtx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { + ambientCtx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, knobs *TestingKnobs, + ) storeRequester { makeTestRequester := func(wc admissionpb.WorkClass) *testRequester { req := &testRequester{ workKind: KVWork, @@ -273,8 +274,8 @@ func TestStoreCoordinators(t *testing.T) { opts := Options{ makeRequesterFunc: makeRequesterFunc, makeStoreRequesterFunc: func( - ctx log.AmbientContext, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, - settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions) storeRequester { + ctx log.AmbientContext, _ roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, + settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, _ *TestingKnobs) storeRequester { reqReg := makeRequesterFunc(ctx, KVWork, granters[admissionpb.RegularWorkClass], settings, metrics, opts) reqElastic := makeRequesterFunc(ctx, KVWork, granters[admissionpb.ElasticWorkClass], settings, metrics, opts) str := &storeTestRequester{} @@ -444,7 +445,7 @@ func (m *testMetricsProvider) setMetricsForStores(stores []int32, metrics pebble m.metrics = m.metrics[:0] for _, s := range stores { m.metrics = append(m.metrics, StoreMetrics{ - StoreID: s, + StoreID: roachpb.StoreID(s), Metrics: &metrics, }) } diff --git a/pkg/util/admission/io_load_listener.go b/pkg/util/admission/io_load_listener.go index 0c39b22c339c..f7b65d20ca21 100644 --- a/pkg/util/admission/io_load_listener.go +++ b/pkg/util/admission/io_load_listener.go @@ -15,6 +15,7 @@ import ( "math" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" @@ -151,7 +152,7 @@ const l0SubLevelCountOverloadThreshold = 20 // storeWriteDone), the tokens are adjusted differently for the // flush/compaction L0 tokens and for the "disk bandwidth" tokens. type ioLoadListener struct { - storeID int32 + storeID roachpb.StoreID settings *cluster.Settings kvRequester storeRequester kvGranter granterWithIOTokens diff --git a/pkg/util/admission/io_load_listener_test.go b/pkg/util/admission/io_load_listener_test.go index 44a98b616bc0..b10507d9cd7d 100644 --- a/pkg/util/admission/io_load_listener_test.go +++ b/pkg/util/admission/io_load_listener_test.go @@ -67,11 +67,11 @@ func TestIOLoadListener(t *testing.T) { case "prep-admission-stats": req.stats = storeAdmissionStats{ - admittedCount: 0, + workCount: 0, writeAccountedBytes: 0, ingestedAccountedBytes: 0, } - d.ScanArgs(t, "admitted", &req.stats.admittedCount) + d.ScanArgs(t, "admitted", &req.stats.workCount) if d.HasArg("write-bytes") { d.ScanArgs(t, "write-bytes", &req.stats.writeAccountedBytes) } @@ -290,7 +290,7 @@ func TestBadIOLoadListenerStats(t *testing.T) { d.BytesRead = rand.Uint64() d.BytesWritten = rand.Uint64() d.ProvisionedBandwidth = 1 << 20 - req.stats.admittedCount = rand.Uint64() + req.stats.workCount = rand.Uint64() req.stats.writeAccountedBytes = rand.Uint64() req.stats.ingestedAccountedBytes = rand.Uint64() req.stats.statsToIgnore.Bytes = rand.Uint64() diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go new file mode 100644 index 000000000000..d25adb9895bb --- /dev/null +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -0,0 +1,447 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +// TestReplicatedWriteAdmission is a data-driven test for admission control of +// replicated writes. We provide the following syntax: +// +// - "init" +// Initializes the store work queues and test granters with 0B of +// {regular,elastic} tokens. +// +// - "admit" tenant=t pri= create-time= \ +// size= range=r log-position=/ origin=n \ +// [ingested=] +// Admit a replicated write request from the given tenant, of the given +// priority/size/create-time, writing to the given log position for the +// specified raft group. Also specified is the node where this request +// originated and whether it was ingested (i.e. as sstables). +// +// - "granter" [class={regular,elastic}] adjust-tokens={-,+} +// Adjust the available {regular,elastic} tokens. If no class is specified, +// the adjustment applies across both work classes. +// +// - "grant" [class={regular,elastic}] +// Grant waiting requests until tokens are depleted or there are no more +// requests waiting. If no class is specified, we grant admission across +// both classes. +// +// - "tenant-weights" [t=]... +// Set weights for each tenant. +// +// - "print" +// Pretty-print work queue internal state (waiting requests, consumed tokens +// per-tenant, physical admission statistics, tenant weights, etc). +func TestReplicatedWriteAdmission(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + dir := datapathutils.TestDataPath(t, "replicated_write_admission") + datadriven.Walk(t, dir, func(t *testing.T, path string) { + var ( + storeWorkQueue *StoreWorkQueue + buf builderWithMu + tg [admissionpb.NumWorkClasses]*testReplicatedWriteGranter + ) + defer func() { + if storeWorkQueue != nil { + storeWorkQueue.close() + } + }() + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + switch d.Cmd { + case "init": + tg = [admissionpb.NumWorkClasses]*testReplicatedWriteGranter{ + admissionpb.RegularWorkClass: newTestReplicatedWriteGranter(t, admissionpb.RegularWorkClass, &buf), + admissionpb.ElasticWorkClass: newTestReplicatedWriteGranter(t, admissionpb.ElasticWorkClass, &buf), + } + registry := metric.NewRegistry() + metrics := makeWorkQueueMetrics("", registry) + opts := makeWorkQueueOptions(KVWork) + opts.usesTokens = true + opts.timeSource = timeutil.NewManualTime(tzero) + opts.disableEpochClosingGoroutine = true + knobs := &TestingKnobs{ + AdmittedReplicatedWorkInterceptor: func( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + originalTokens int64, + createTime int64, + ) { + ingested := "" + if rwi.Ingested { + ingested = " ingested" + } + buf.printf("admitted [tenant=t%d pri=%s create-time=%s size=%s range=r%s origin=n%s log-position=%s%s]", + tenantID.ToUint64(), pri, timeutil.FromUnixNanos(createTime).Sub(tzero), + printTrimmedBytes(originalTokens), rwi.RangeID, rwi.Origin, rwi.LogPosition, ingested) + }, + } + storeWorkQueue = makeStoreWorkQueue( + log.MakeTestingAmbientContext(tracing.NewTracer()), + roachpb.StoreID(1), + [admissionpb.NumWorkClasses]granterWithStoreWriteDone{ + tg[admissionpb.RegularWorkClass], + tg[admissionpb.ElasticWorkClass], + }, + st, metrics, opts, knobs, + ).(*StoreWorkQueue) + tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] + tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] + return printTestGranterTokens(tg) + + case "admit": + require.NotNilf(t, storeWorkQueue, "uninitialized store work queue (did you use 'init'?)") + var arg string + + // Parse tenant=t. + d.ScanArgs(t, "tenant", &arg) + ti, err := strconv.Atoi(strings.TrimPrefix(arg, "t")) + require.NoError(t, err) + tenantID := roachpb.MustMakeTenantID(uint64(ti)) + + // Parse pri=. + d.ScanArgs(t, "pri", &arg) + pri, found := reverseWorkPriorityDict[arg] + require.True(t, found) + + // Parse size=. + d.ScanArgs(t, "size", &arg) + bytes, err := humanizeutil.ParseBytes(arg) + require.NoError(t, err) + + // Parse range=r. + d.ScanArgs(t, "range", &arg) + ri, err := strconv.Atoi(strings.TrimPrefix(arg, "r")) + require.NoError(t, err) + rangeID := roachpb.RangeID(ri) + + // Parse origin=n. + d.ScanArgs(t, "origin", &arg) + ni, err := strconv.Atoi(strings.TrimPrefix(arg, "n")) + require.NoError(t, err) + nodeID := roachpb.NodeID(ni) + + // Parse log-position=/. + logPosition := parseLogPosition(t, d) + + // Parse create-time=. + d.ScanArgs(t, "create-time", &arg) + dur, err := time.ParseDuration(arg) + require.NoError(t, err) + createTime := tzero.Add(dur) + + // Parse ingested=. + var ingested bool + if d.HasArg("ingested") { + d.ScanArgs(t, "ingested", &arg) + ingested, err = strconv.ParseBool(arg) + require.NoError(t, err) + } + + info := StoreWriteWorkInfo{ + WorkInfo: WorkInfo{ + TenantID: tenantID, + Priority: pri, + CreateTime: createTime.UnixNano(), + RequestedCount: bytes, + ReplicatedWorkInfo: ReplicatedWorkInfo{ + Enabled: true, + RangeID: rangeID, + Origin: nodeID, + LogPosition: logPosition, + Ingested: ingested, + }, + }, + } + + handle, err := storeWorkQueue.Admit(ctx, info) + require.NoError(t, err) + require.False(t, handle.UseAdmittedWorkDone()) + return buf.stringAndReset() + + case "granter": + // Parse adjust-tokens={+,-}. + var arg string + d.ScanArgs(t, "adjust-tokens", &arg) + isPositive := strings.Contains(arg, "+") + arg = strings.TrimPrefix(arg, "+") + arg = strings.TrimPrefix(arg, "-") + delta, err := humanizeutil.ParseBytes(arg) + require.NoError(t, err) + if !isPositive { + delta = -delta + } + + var wcs []admissionpb.WorkClass + if d.HasArg("class") { + wcs = append(wcs, parseWorkClass(t, d)) + } else { + wcs = append(wcs, + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass) + } + for _, wc := range wcs { + tg[wc].tokens += delta + } + return printTestGranterTokens(tg) + + case "tenant-weights": + weightMap := make(map[uint64]uint32) + for _, cmdArg := range d.CmdArgs { + tenantID, err := strconv.Atoi(strings.TrimPrefix(cmdArg.Key, "t")) + require.NoError(t, err) + weight, err := strconv.Atoi(cmdArg.Vals[0]) + require.NoError(t, err) + weightMap[uint64(tenantID)] = uint32(weight) + } + storeWorkQueue.SetTenantWeights(weightMap) + return "" + + case "grant": + var wcs []admissionpb.WorkClass + if d.HasArg("class") { + wcs = append(wcs, parseWorkClass(t, d)) + } else { + wcs = append(wcs, + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass) + } + for _, wc := range wcs { + tg[wc].grant() + } + return buf.stringAndReset() + + case "print": + storeWorkQueue.mu.Lock() + defer storeWorkQueue.mu.Unlock() + return fmt.Sprintf("physical-stats: work-count=%d written-bytes=%s ingested-bytes=%s\n[regular work queue]: %s\n[elastic work queue]: %s\n", + storeWorkQueue.mu.stats.workCount, + printTrimmedBytes(int64(storeWorkQueue.mu.stats.writeAccountedBytes)), + printTrimmedBytes(int64(storeWorkQueue.mu.stats.ingestedAccountedBytes)), + printWorkQueue(&storeWorkQueue.q[admissionpb.RegularWorkClass]), + printWorkQueue(&storeWorkQueue.q[admissionpb.ElasticWorkClass]), + ) + + default: + return fmt.Sprintf("unknown command: %s", d.Cmd) + } + }) + }) +} + +func parseLogPosition(t *testing.T, d *datadriven.TestData) LogPosition { + // Parse log-position=/. + var arg string + d.ScanArgs(t, "log-position", &arg) + inner := strings.Split(arg, "/") + require.Len(t, inner, 2) + term, err := strconv.Atoi(inner[0]) + require.NoError(t, err) + index, err := strconv.Atoi(inner[1]) + require.NoError(t, err) + return LogPosition{ + Term: uint64(term), + Index: uint64(index), + } +} + +func parseWorkClass(t *testing.T, d *datadriven.TestData) admissionpb.WorkClass { + // Parse class={regular,elastic}. + var arg string + d.ScanArgs(t, "class", &arg) + var wc admissionpb.WorkClass + switch arg { + case "regular": + wc = admissionpb.RegularWorkClass + case "elastic": + wc = admissionpb.ElasticWorkClass + default: + t.Fatalf("unexpected class: %s", arg) + } + return wc +} + +func printTrimmedBytes(bytes int64) string { + return strings.ReplaceAll(string(humanizeutil.IBytes(bytes)), " ", "") +} + +func printTestGranterTokens(tg [admissionpb.NumWorkClasses]*testReplicatedWriteGranter) string { + var buf strings.Builder + for i, wc := range []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + admissionpb.ElasticWorkClass, + } { + if i != 0 { + buf.WriteString("\n") + } + buf.WriteString(fmt.Sprintf("[%s] %s tokens available", wc, printTrimmedBytes(tg[wc].tokens))) + } + return buf.String() +} + +func printWorkQueue(q *WorkQueue) string { + var buf strings.Builder + q.mu.Lock() + defer q.mu.Unlock() + buf.WriteString(fmt.Sprintf("len(tenant-heap)=%d", len(q.mu.tenantHeap))) + if len(q.mu.tenantHeap) > 0 { + buf.WriteString(fmt.Sprintf(" top-tenant=t%d", q.mu.tenantHeap[0].id)) + } + var ids []uint64 + for id := range q.mu.tenants { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + for _, id := range ids { + tenant := q.mu.tenants[id] + buf.WriteString(fmt.Sprintf("\n tenant=t%d weight=%d fifo-threshold=%s used=%s", + tenant.id, + tenant.weight, + admissionpb.WorkPriority(tenant.fifoPriorityThreshold), + printTrimmedBytes(int64(tenant.used)), + )) + if len(tenant.waitingWorkHeap) > 0 { + buf.WriteString("\n") + + for i := range tenant.waitingWorkHeap { + w := tenant.waitingWorkHeap[i] + if i != 0 { + buf.WriteString("\n") + } + + ingested := "" + if w.replicated.Ingested { + ingested = " ingested " + } + + buf.WriteString(fmt.Sprintf(" [%d: pri=%s create-time=%s size=%s range=r%d origin=n%d log-position=%s%s]", i, + w.priority, + timeutil.FromUnixNanos(w.createTime).Sub(tzero), + printTrimmedBytes(w.requestedCount), + w.replicated.RangeID, + w.replicated.Origin, + w.replicated.LogPosition, + ingested, + )) + } + } + } + return buf.String() +} + +// tzero represents the t=0, the earliest possible time. All other +// create-time= is relative to this time. +var tzero = timeutil.Unix(0, 0) + +var reverseWorkPriorityDict map[string]admissionpb.WorkPriority + +func init() { + reverseWorkPriorityDict = make(map[string]admissionpb.WorkPriority) + for k, v := range admissionpb.WorkPriorityDict { + reverseWorkPriorityDict[v] = k + } +} + +type testReplicatedWriteGranter struct { + t *testing.T + wc admissionpb.WorkClass + buf *builderWithMu + r requester + + tokens int64 +} + +var _ granterWithStoreWriteDone = &testReplicatedWriteGranter{} + +func newTestReplicatedWriteGranter( + t *testing.T, wc admissionpb.WorkClass, buf *builderWithMu, +) *testReplicatedWriteGranter { + return &testReplicatedWriteGranter{ + t: t, + wc: wc, + buf: buf, + } +} +func (tg *testReplicatedWriteGranter) grantKind() grantKind { + return token +} + +func (tg *testReplicatedWriteGranter) tryGet(count int64) bool { + if count > tg.tokens { + tg.buf.printf("[%s] try-get=%s available=%s => insufficient tokens", + tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens)) + return false + } + + tg.buf.printf("[%s] try-get=%s available=%s => sufficient tokens", + tg.wc, printTrimmedBytes(count), printTrimmedBytes(tg.tokens)) + tg.tokens -= count + return true +} + +func (tg *testReplicatedWriteGranter) returnGrant(count int64) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) tookWithoutPermission(count int64) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) continueGrantChain(grantChainID grantChainID) { + tg.t.Fatalf("unimplemented") +} + +func (tg *testReplicatedWriteGranter) grant() { + for { + if tg.tokens <= 0 { + return // nothing left to do + } + if !tg.r.hasWaitingRequests() { + return // nothing left to do + } + _ = tg.r.granted(0 /* unused */) + } +} + +func (tg *testReplicatedWriteGranter) storeWriteDone( + originalTokens int64, doneInfo StoreWorkDoneInfo, +) (additionalTokens int64) { + tg.tokens -= originalTokens + return 0 +} diff --git a/pkg/util/admission/store_token_estimation.go b/pkg/util/admission/store_token_estimation.go index be66b50452d1..2e2eca842b8e 100644 --- a/pkg/util/admission/store_token_estimation.go +++ b/pkg/util/admission/store_token_estimation.go @@ -177,8 +177,8 @@ func (e *storePerWorkTokenEstimator) updateEstimates( if adjustedIntL0IngestedBytes < 0 { adjustedIntL0IngestedBytes = 0 } - intWorkCount := int64(admissionStats.admittedCount) - - int64(e.cumStoreAdmissionStats.admittedCount) + intWorkCount := int64(admissionStats.workCount) - + int64(e.cumStoreAdmissionStats.workCount) intL0WriteAccountedBytes := int64(admissionStats.writeAccountedBytes) - int64(e.cumStoreAdmissionStats.writeAccountedBytes) // Note that these are not L0 ingested bytes, since we don't know how diff --git a/pkg/util/admission/store_token_estimation_test.go b/pkg/util/admission/store_token_estimation_test.go index faf2aa46836e..b2898f203b46 100644 --- a/pkg/util/admission/store_token_estimation_test.go +++ b/pkg/util/admission/store_token_estimation_test.go @@ -58,7 +58,7 @@ func TestStorePerWorkTokenEstimator(t *testing.T) { d.ScanArgs(t, "admitted", &admitted) d.ScanArgs(t, "write-accounted", &writeAccounted) d.ScanArgs(t, "ingested-accounted", &ingestedAccounted) - admissionStats.admittedCount += admitted + admissionStats.workCount += admitted admissionStats.writeAccountedBytes += writeAccounted admissionStats.ingestedAccountedBytes += ingestedAccounted if d.HasArg("bypassed-count") { diff --git a/pkg/util/admission/testdata/io_load_listener b/pkg/util/admission/testdata/io_load_listener index 1f64c463641e..5575a7fd5f55 100644 --- a/pkg/util/admission/testdata/io_load_listener +++ b/pkg/util/admission/testdata/io_load_listener @@ -6,7 +6,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Even though above the threshold, the first 60 ticks don't limit the tokens. set-state l0-bytes=10000 l0-added-write=1000 l0-files=21 l0-sublevels=21 @@ -76,7 +76,7 @@ tick: 59, setAvailableTokens: io-tokens=unlimited elastic-disk-bw-tokens=unlimit prep-admission-stats admitted=10000 write-bytes=40000 ---- -{admittedCount:10000 writeAccountedBytes:40000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:10000 writeAccountedBytes:40000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Delta added is 100,000. The l0-bytes are the same, so compactions removed # 100,000 bytes. Smoothed removed by compactions is 50,000. Each admitted is @@ -151,7 +151,7 @@ tick: 59, setAvailableTokens: io-tokens=169 elastic-disk-bw-tokens=unlimited prep-admission-stats admitted=20000 write-bytes=80000 ---- -{admittedCount:20000 writeAccountedBytes:80000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:20000 writeAccountedBytes:80000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # Same delta as previous but smoothing bumps up the tokens to 25,000. set-state l0-bytes=10000 l0-added-write=201000 l0-files=21 l0-sublevels=21 @@ -232,7 +232,7 @@ setAvailableTokens: io-tokens=365 elastic-disk-bw-tokens=unlimited prep-admission-stats admitted=30000 write-bytes=120000 ---- -{admittedCount:30000 writeAccountedBytes:120000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:30000 writeAccountedBytes:120000 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # l0-sublevels drops below threshold. We calculate the smoothed values, but # don't limit the tokens. @@ -250,7 +250,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=1000 l0-added-ingested=0 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -262,7 +262,7 @@ tick: 0, setAvailableTokens: io-tokens=unlimited elastic-disk-bw-tokens=unlimite # the admitted requests. prep-admission-stats admitted=10 write-bytes=130000 ingested-bytes=20000 ---- -{admittedCount:10 writeAccountedBytes:130000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:10 writeAccountedBytes:130000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} # The ingested model can be fit with a multiplier of ~1.5 for the interval, # but since the l0-ingest-lm model had a previous multiplier of 0.75 and the @@ -282,7 +282,7 @@ setAvailableTokens: io-tokens=417 elastic-disk-bw-tokens=unlimited # ingested model is decayed by a factor of 2. prep-admission-stats admitted=20 write-bytes=150000 ingested-bytes=20000 ---- -{admittedCount:20 writeAccountedBytes:150000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:20 writeAccountedBytes:150000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=191000 l0-added-ingested=30000 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -296,7 +296,7 @@ setAvailableTokens: io-tokens=459 elastic-disk-bw-tokens=unlimited # bytes to L0. We don't let unaccounted bytes become negative. prep-admission-stats admitted=30 write-bytes=250000 ingested-bytes=20000 ingested-into-l0=20000 ---- -{admittedCount:30 writeAccountedBytes:250000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:30 writeAccountedBytes:250000 ingestedAccountedBytes:20000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=1000 l0-added-write=211000 l0-added-ingested=30000 l0-files=21 l0-sublevels=21 print-only-first-tick=true ---- @@ -312,7 +312,7 @@ init prep-admission-stats admitted=0 ---- -{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} set-state l0-bytes=10000 l0-added-write=1000 l0-files=1 l0-sublevels=1 print-only-first-tick=true ---- diff --git a/pkg/util/admission/testdata/replicated_write_admission/class_segmentation b/pkg/util/admission/testdata/replicated_write_admission/class_segmentation new file mode 100644 index 000000000000..816f1a5184ff --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/class_segmentation @@ -0,0 +1,63 @@ +# Verify that we'll admit {regular,elastic} requests based on work-class +# specific tokens being generated. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one at low-pri but lower log position and lower +# create-time. It gets classified as an elastic request. +admit tenant=t1 pri=low-pri create-time=1us size=1B range=r1 origin=n1 log-position=4/20 +---- +[elastic] try-get=1B available=0B => insufficient tokens + +# And one at high-pri but higher log position and higher create-time. It gets +# classified as a regular request. +admit tenant=t1 pri=high-pri create-time=2us size=1B range=r1 origin=n1 log-position=4/21 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# Observe both waiting requests and physical admission stats. Note that the two +# requests sit within their own work queues (segmented by class). +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=high-pri create-time=2µs size=1B range=r1 origin=n1 log-position=4/21] +[elastic work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=low-pri create-time=1µs size=1B range=r1 origin=n1 log-position=4/20] + +# Produce 1B worth of regular tokens and verify that it only admits work +# waiting in the regular work queue. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +grant +---- +admitted [tenant=t1 pri=high-pri create-time=2µs size=1B range=r1 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=0 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B +[elastic work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=low-pri create-time=1µs size=1B range=r1 origin=n1 log-position=4/20] + +# Do the same for elastic tokens. +granter class=elastic adjust-tokens=+1B +---- +[regular] 0B tokens available +[elastic] 1B tokens available + +grant +---- +admitted [tenant=t1 pri=low-pri create-time=1µs size=1B range=r1 origin=n1 log-position=4/20] + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range new file mode 100644 index 000000000000..d36a0455822e --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_create_time_low_position_different_range @@ -0,0 +1,53 @@ +# Verify that we use create-time based ordering for replicated write admission +# when done so across different ranges. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one created at t=5us with a lower log position on r1. +admit tenant=t1 pri=normal-pri create-time=5us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one created at t=1us with a higher log position on r2. +admit tenant=t1 pri=normal-pri create-time=1us size=1B range=r2 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note that the +# request with the lower create time sorts first despite having the higher log +# position. Admission work queues order work based entirely on create-times, +# and the assignment of monotonic create-times (WRT log positions) happens only +# within a range and by higher-level components -- kvflowcontrol.Handle. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1µs size=1B range=r2 origin=n1 log-position=4/21] + [1: pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# request with the lower create-time. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1µs size=1B range=r2 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=5µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position b/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position new file mode 100644 index 000000000000..5908633ae4b0 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/high_pri_low_position @@ -0,0 +1,51 @@ +# Verify that we'll admit in priority order, even if the higher priority work +# has a higher {create time, log position}. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Admit two requests, one at normal-pri but lower {create time, log position}. +admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# And one at high-pri but higher {create time, log position} +admit tenant=t1 pri=high-pri create-time=1.002us size=1B range=r1 origin=n1 log-position=4/21 +---- + +# Observe both waiting requests and physical admission stats. Note that the +# high-pri request sorts first, despite having a higher {create time, log +# position}. +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=high-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + [1: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 1B worth of regular tokens. +granter class=regular adjust-tokens=+1B +---- +[regular] 1B tokens available +[elastic] 0B tokens available + +# Grant admission to requests. Since we have 1B worth of tokens, and 2 waiting +# requests wanting 1B each, we're only able to admit one. Verify that it's the +# high-pri request that gets through. +grant class=regular +---- +admitted [tenant=t1 pri=high-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + +print +---- +physical-stats: work-count=2 written-bytes=2B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/overview b/pkg/util/admission/testdata/replicated_write_admission/overview new file mode 100644 index 000000000000..f79f423fa583 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/overview @@ -0,0 +1,70 @@ +# Walk through the basics of the datadriven syntax. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Try to admit two requests of 1B each, at incrementing log positions. The +# first requests tries the fast path and fails admission, and gets added to the +# work queue's internal heap. +admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +# Observe that the physical stats show that the actual work was done, but the +# work is virtually enqueued in the work queue for deferred admission. +print +---- +physical-stats: work-count=1 written-bytes=1B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +[elastic work queue]: len(tenant-heap)=0 + +# Admit a second request. Since there's already a request waiting, we don't get +# the fast path. +admit tenant=t1 pri=normal-pri create-time=1.002us size=1B range=r1 origin=n1 log-position=4/21 ingested=true +---- + +# Observe both waiting requests. +print +---- +physical-stats: work-count=2 written-bytes=1B ingested-bytes=1B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21 ingested ] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 2B worth of regular tokens. +granter class=regular adjust-tokens=+2B +---- +[regular] 2B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 2B worth of tokens, and 2 waiting +# requests wanting 1B each, we're able to admit both. We do so in log-position +# order. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t1 pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21 ingested] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 2B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe the empty tenant heaps (all work was admitted) and 2B worth of used +# tokens for t1. +print +---- +physical-stats: work-count=2 written-bytes=1B ingested-bytes=1B +[regular work queue]: len(tenant-heap)=0 + tenant=t1 weight=1 fifo-threshold=low-pri used=2B +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness new file mode 100644 index 000000000000..c00240b461f7 --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_fairness @@ -0,0 +1,70 @@ +# Observe how tokens are consumed fairly across tenants. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# For two tenants t1 and t2, try to admit two requests of 1B each at +# incrementing log positions. We specify create-times in log-position order for +# work within a given range, similar to what we do at the issuing client +# (kvflowcontrol.Handle). +admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +admit tenant=t1 pri=normal-pri create-time=1.002us size=1B range=r1 origin=n1 log-position=4/21 +---- + +admit tenant=t2 pri=normal-pri create-time=1.001us size=1B range=r2 origin=n1 log-position=5/20 +---- + +admit tenant=t2 pri=normal-pri create-time=1.002us size=1B range=r2 origin=n1 log-position=5/21 +---- + +# Observe all waiting requests. +print +---- +physical-stats: work-count=4 written-bytes=4B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t1 + tenant=t1 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + tenant=t2 weight=1 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 2B worth of regular tokens. +granter class=regular adjust-tokens=+2B +---- +[regular] 2B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 2B worth of tokens, and 4 waiting +# requests wanting 1B each from 2 tenants, we're able to 1 request from each +# tenant. +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t2 pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 2B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe that each tenant still has one waiting request. +print +---- +physical-stats: work-count=4 written-bytes=4B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t2 + tenant=t1 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + tenant=t2 weight=1 fifo-threshold=low-pri used=1B + [0: pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/replicated_write_admission/tenant_weights b/pkg/util/admission/testdata/replicated_write_admission/tenant_weights new file mode 100644 index 000000000000..6de47b20539c --- /dev/null +++ b/pkg/util/admission/testdata/replicated_write_admission/tenant_weights @@ -0,0 +1,107 @@ +# Observe how tokens are consumed proportionally across tenants, as configured +# by tenant weights. + +init +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +tenant-weights t1=2 t2=5 +---- + +# For two tenants t1 and t2, try to admit 5 requests of 1B each at +# incrementing log positions. The first attempt tries to go through the fast +# path but fails. The create-times increment with incrementing log positions, +# to mimic the kind of explicit sequencing we introduce at +# kvflowcontrol.Handle. +admit tenant=t1 pri=normal-pri create-time=1.001us size=1B range=r1 origin=n1 log-position=4/20 +---- +[regular] try-get=1B available=0B => insufficient tokens + +admit tenant=t1 pri=normal-pri create-time=1.002us size=1B range=r1 origin=n1 log-position=4/21 +---- + +admit tenant=t1 pri=normal-pri create-time=1.003us size=1B range=r1 origin=n1 log-position=4/22 +---- + +admit tenant=t1 pri=normal-pri create-time=1.004us size=1B range=r1 origin=n1 log-position=4/23 +---- + +admit tenant=t1 pri=normal-pri create-time=1.005us size=1B range=r1 origin=n1 log-position=4/24 +---- + +admit tenant=t2 pri=normal-pri create-time=1.001us size=1B range=r2 origin=n1 log-position=5/20 +---- + +admit tenant=t2 pri=normal-pri create-time=1.002us size=1B range=r2 origin=n1 log-position=5/21 +---- + +admit tenant=t2 pri=normal-pri create-time=1.003us size=1B range=r2 origin=n1 log-position=5/22 +---- + +admit tenant=t2 pri=normal-pri create-time=1.004us size=1B range=r2 origin=n1 log-position=5/23 +---- + +admit tenant=t2 pri=normal-pri create-time=1.005us size=1B range=r2 origin=n1 log-position=5/24 +---- + +# Observe all waiting requests. +print +---- +physical-stats: work-count=10 written-bytes=10B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=2 top-tenant=t1 + tenant=t1 weight=2 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] + [2: pri=normal-pri create-time=1.003µs size=1B range=r1 origin=n1 log-position=4/22] + [3: pri=normal-pri create-time=1.004µs size=1B range=r1 origin=n1 log-position=4/23] + [4: pri=normal-pri create-time=1.005µs size=1B range=r1 origin=n1 log-position=4/24] + tenant=t2 weight=5 fifo-threshold=low-pri used=0B + [0: pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] + [1: pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] + [2: pri=normal-pri create-time=1.003µs size=1B range=r2 origin=n1 log-position=5/22] + [3: pri=normal-pri create-time=1.004µs size=1B range=r2 origin=n1 log-position=5/23] + [4: pri=normal-pri create-time=1.005µs size=1B range=r2 origin=n1 log-position=5/24] +[elastic work queue]: len(tenant-heap)=0 + +# Produce 7B worth of regular tokens. +granter class=regular adjust-tokens=+7B +---- +[regular] 7B tokens available +[elastic] 0B tokens available + +# Grant admission requests. Since we have 7B worth of tokens, and 10 waiting +# requests wanting 1B each, we'll be able to admit 7 requests. We'll bias +# towards the tenant with the higher weight (t2). +grant class=regular +---- +admitted [tenant=t1 pri=normal-pri create-time=1.001µs size=1B range=r1 origin=n1 log-position=4/20] +admitted [tenant=t2 pri=normal-pri create-time=1.001µs size=1B range=r2 origin=n1 log-position=5/20] +admitted [tenant=t2 pri=normal-pri create-time=1.002µs size=1B range=r2 origin=n1 log-position=5/21] +admitted [tenant=t2 pri=normal-pri create-time=1.003µs size=1B range=r2 origin=n1 log-position=5/22] +admitted [tenant=t1 pri=normal-pri create-time=1.002µs size=1B range=r1 origin=n1 log-position=4/21] +admitted [tenant=t2 pri=normal-pri create-time=1.004µs size=1B range=r2 origin=n1 log-position=5/23] +admitted [tenant=t2 pri=normal-pri create-time=1.005µs size=1B range=r2 origin=n1 log-position=5/24] + +# Pretty print granter state to show no more available tokens. We've consumed +# the 7B above. +granter adjust-tokens=+0B +---- +[regular] 0B tokens available +[elastic] 0B tokens available + +# Observe that t2 has no waiting requests, but t1 still has 3. So we've +# processed 5 t2 requests for every 2 t1 requests, exactly what we'd expect for +# a 2:5 weight ratio between t1:t2. +print +---- +physical-stats: work-count=10 written-bytes=10B ingested-bytes=0B +[regular work queue]: len(tenant-heap)=1 top-tenant=t1 + tenant=t1 weight=2 fifo-threshold=low-pri used=2B + [0: pri=normal-pri create-time=1.003µs size=1B range=r1 origin=n1 log-position=4/22] + [1: pri=normal-pri create-time=1.004µs size=1B range=r1 origin=n1 log-position=4/23] + [2: pri=normal-pri create-time=1.005µs size=1B range=r1 origin=n1 log-position=4/24] + tenant=t2 weight=5 fifo-threshold=low-pri used=5B +[elastic work queue]: len(tenant-heap)=0 + +# vim:ft=sh diff --git a/pkg/util/admission/testdata/store_work_queue b/pkg/util/admission/testdata/store_work_queue index 26fba8dc46e4..ca4ebdfbccd6 100644 --- a/pkg/util/admission/testdata/store_work_queue +++ b/pkg/util/admission/testdata/store_work_queue @@ -5,7 +5,7 @@ print ---- regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:0 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:1} set-try-get-return-value v=true @@ -14,7 +14,7 @@ set-try-get-return-value v=true admit id=1 tenant=53 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 1: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:1 workClass:0 admissionEnabled:true} +id 1: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:1 workClass:0 useAdmittedWorkDone:true} work-done id=1 ---- @@ -25,18 +25,18 @@ set-store-request-estimates write-tokens=100 regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 1, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} admit id=2 tenant=55 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 2: admit succeeded with handle {tenantID:{InternalValue:55} writeTokens:100 workClass:0 admissionEnabled:true} +id 2: admit succeeded with handle {tenantID:{InternalValue:55} writeTokens:100 workClass:0 useAdmittedWorkDone:true} admit id=3 tenant=53 priority=0 create-time-millis=1 bypass=false ---- tryGet regular: returning true -id 3: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:100 workClass:0 admissionEnabled:true} +id 3: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:100 workClass:0 useAdmittedWorkDone:true} print ---- @@ -44,7 +44,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 101, w: 1, fifo: -128 tenant-id: 55 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:1 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} set-try-get-return-value v=false @@ -65,13 +65,13 @@ regular workqueue: closed epoch: 0 tenantHeap len: 1 top tenant: 57 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 0, w: 1, fifo: -128 waiting work heap: [0: pri: normal-pri, ct: 1, epoch: 0, qt: 0] elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} granted ---- continueGrantChain regular 0 -id 4: admit succeeded with handle {tenantID:{InternalValue:57} writeTokens:100 workClass:0 admissionEnabled:true} +id 4: admit succeeded with handle {tenantID:{InternalValue:57} writeTokens:100 workClass:0 useAdmittedWorkDone:true} granted regular: returned 100 print @@ -81,7 +81,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:2 writeAccountedBytes:0 ingestedAccountedBytes:0 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} work-done id=3 ingested-bytes=1000000 additional-tokens=50000 @@ -95,7 +95,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:100} set-store-request-estimates write-tokens=10000 @@ -105,7 +105,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:3 writeAccountedBytes:0 ingestedAccountedBytes:1000000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:10000} work-done id=4 write-bytes=2000 ingested-bytes=1000 additional-tokens=2000 @@ -119,7 +119,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:4 writeAccountedBytes:2000 ingestedAccountedBytes:1001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} +stats:{workCount:4 writeAccountedBytes:2000 ingestedAccountedBytes:1001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:0 writeBypassedAccountedBytes:0 ingestedBypassedAccountedBytes:0}} estimates:{writeTokens:10000} bypassed-work-done work-count=10 write-bytes=1000 ingested-bytes=1000000 @@ -133,7 +133,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:0 ApproxIngestedIntoL0Bytes:0}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} stats-to-ignore ingested-bytes=12000 ingested-into-L0-bytes=9000 @@ -143,7 +143,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 55 used: 600, w: 1, fifo: -128 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} # Elastic work. @@ -160,7 +160,7 @@ granted regular: returned 0 granted elastic=true ---- continueGrantChain elastic 0 -id 5: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:10000 workClass:1 admissionEnabled:true} +id 5: admit succeeded with handle {tenantID:{InternalValue:53} writeTokens:10000 workClass:1 useAdmittedWorkDone:true} granted elastic: returned 10000 print @@ -171,7 +171,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 57 used: 2100, w: 1, fifo: -128 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} set-try-get-return-value v=true elastic=true @@ -180,7 +180,7 @@ set-try-get-return-value v=true elastic=true admit id=6 tenant=54 priority=-40 create-time-millis=3 bypass=false ---- tryGet elastic: returning true -id 6: admit succeeded with handle {tenantID:{InternalValue:54} writeTokens:10000 workClass:1 admissionEnabled:true} +id 6: admit succeeded with handle {tenantID:{InternalValue:54} writeTokens:10000 workClass:1 useAdmittedWorkDone:true} print ---- @@ -191,7 +191,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10000, w: 1, fifo: -128 tenant-id: 54 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:14 writeAccountedBytes:3000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} work-done id=5 write-bytes=1000 additional-tokens=200 @@ -207,7 +207,7 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10200, w: 1, fifo: -128 tenant-id: 54 used: 10000, w: 1, fifo: -128 -stats:{admittedCount:15 writeAccountedBytes:4000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:15 writeAccountedBytes:4000 ingestedAccountedBytes:2001000 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} work-done id=6 ingested-bytes=500 additional-tokens=500 @@ -223,5 +223,5 @@ regular workqueue: closed epoch: 0 tenantHeap len: 0 elastic workqueue: closed epoch: 0 tenantHeap len: 0 tenant-id: 53 used: 10200, w: 1, fifo: -128 tenant-id: 54 used: 10500, w: 1, fifo: -128 -stats:{admittedCount:16 writeAccountedBytes:4000 ingestedAccountedBytes:2001500 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} +stats:{workCount:16 writeAccountedBytes:4000 ingestedAccountedBytes:2001500 statsToIgnore:{IngestOperationStats:{Bytes:12000 ApproxIngestedIntoL0Bytes:9000}} aux:{bypassedCount:10 writeBypassedAccountedBytes:1000 ingestedBypassedAccountedBytes:1000000}} estimates:{writeTokens:10000} diff --git a/pkg/util/admission/testing_knobs.go b/pkg/util/admission/testing_knobs.go new file mode 100644 index 000000000000..659552c84dc5 --- /dev/null +++ b/pkg/util/admission/testing_knobs.go @@ -0,0 +1,36 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// TestingKnobs provide fine-grained control over the various admission control +// components for testing. +type TestingKnobs struct { + // AdmittedReplicatedWorkInterceptor is invoked whenever replicated work is + // admitted. + AdmittedReplicatedWorkInterceptor func( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + originalTokens int64, + createTime int64, + ) +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (t *TestingKnobs) ModuleTestingKnobs() {} + +var _ base.ModuleTestingKnobs = (*TestingKnobs)(nil) diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 3ec037c171be..d6a274224434 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -156,9 +156,9 @@ var epochLIFOQueueDelayThresholdToSwitchToLIFO = settings.RegisterDurationSettin return nil }).WithPublic() -// WorkInfo provides information that is used to order work within an -// WorkQueue. The WorkKind is not included as a field since an WorkQueue deals -// with a single WorkKind. +// WorkInfo provides information that is used to order work within an WorkQueue. +// The WorkKind is not included as a field since an WorkQueue deals with a +// single WorkKind. type WorkInfo struct { // TenantID is the id of the tenant. For single-tenant clusters, this will // always be the SystemTenantID. @@ -177,22 +177,58 @@ type WorkInfo struct { // when KV work generates other KV work (to avoid deadlock). Ignored // otherwise. BypassAdmission bool + // RequestedCount is the requested number of tokens or slots. If unset: + // - For slot-based queues we treat it as an implicit request of 1; + // - For the store work queue, we use per-request estimates to deduct some + // number of tokens at-admit time. Note that this only applies to the + // legacy above-raft admission control. With admission control for + // replicated writes (done so asynchronously, below-raft; see + // ReplicatedWrite below), we do know the size of the write being + // admitted, so RequestedCount is set accordingly. + RequestedCount int64 + // ReplicatedWorkInfo groups everything needed to admit replicated writes, done + // so asynchronously below-raft as part of replication admission control. + ReplicatedWorkInfo ReplicatedWorkInfo +} + +// ReplicatedWorkInfo groups everything needed to admit replicated writes, done +// so asynchronously below-raft as part of replication admission control. +type ReplicatedWorkInfo struct { + // Enabled captures whether this work represents a replicated write, + // subject to below-raft asynchronous admission control. + Enabled bool + // RangeID identifies the raft group on behalf of which work is being + // admitted. + RangeID roachpb.RangeID + // Origin is the node at which this work originated. It's used for + // replication admission control to inform the origin of admitted work + // (after which flow tokens are released, permitting more replicated + // writes). + Origin roachpb.NodeID + // LogPosition is the point on the raft log where the write was replicated. + LogPosition LogPosition + // Ingested captures whether the write work corresponds to an ingest + // (for sstables, for example). This is used alongside RequestedCount to + // maintain accurate linear models for L0 growth due to ingests and + // regular write batches. + Ingested bool +} - // Optional information specified only for WorkQueues where the work is tied - // to a range. This allows queued work to return early as soon as the range - // is no longer in a relevant state at this node. Currently only KVWork is - // tied to a range. - // TODO(sumeer): use these in the WorkQueue implementation. +// LogPosition is a point on the raft log, identified by a term and an index. +type LogPosition struct { + Term uint64 + Index uint64 +} - // RangeID is the range at which this work must be performed. Optional (see - // comment above). - RangeID roachpb.RangeID - // RequiresLeaseholder is true iff the work requires the leaseholder. - // Optional (see comment above). - RequiresLeaseholder bool +func (r LogPosition) String() string { + return fmt.Sprintf("%d/%d", r.Term, r.Index) +} - // For internal use by wrapper classes. The requested tokens or slots. - requestedCount int64 +func (r LogPosition) Less(o LogPosition) bool { + if r.Term != o.Term { + return r.Term < o.Term + } + return r.Index < o.Index } // WorkQueue maintains a queue of work waiting to be admitted. Ordering of @@ -222,12 +258,15 @@ type WorkInfo struct { // kvQueue.AdmittedWorkDone(tid) // } type WorkQueue struct { - ambientCtx context.Context - workKind WorkKind - granter granter - usesTokens bool - tiedToRange bool - settings *cluster.Settings + ambientCtx context.Context + workKind WorkKind + granter granter + usesTokens bool + tiedToRange bool + usesAsyncAdmit bool + settings *cluster.Settings + + onAdmittedReplicatedWork onAdmittedReplicatedWork // Prevents more than one caller to be in Admit and calling tryGet or adding // to the queue. It allows WorkQueue to release mu before calling tryGet and @@ -269,8 +308,9 @@ type WorkQueue struct { var _ requester = &WorkQueue{} type workQueueOptions struct { - usesTokens bool - tiedToRange bool + usesTokens bool + tiedToRange bool + usesAsyncAdmit bool // timeSource can be set to non-nil for tests. If nil, // the timeutil.DefaultTimeSource will be used. @@ -329,6 +369,7 @@ func initWorkQueue( q.granter = granter q.usesTokens = opts.usesTokens q.tiedToRange = opts.tiedToRange + q.usesAsyncAdmit = opts.usesAsyncAdmit q.settings = settings q.logThreshold = log.Every(5 * time.Minute) q.metrics = metrics @@ -369,7 +410,9 @@ func (q *WorkQueue) timeNow() time.Time { } func (q *WorkQueue) epochLIFOEnabled() bool { - return EpochLIFOEnabled.Get(&q.settings.SV) + // We don't use epoch LIFO for below-raft admission control. See I12 from + // kvflowcontrol/doc.go. + return EpochLIFOEnabled.Get(&q.settings.SV) && !q.usesAsyncAdmit } // Samples the latest cluster settings for epoch-LIFO. @@ -492,17 +535,24 @@ func (q *WorkQueue) tryCloseEpoch(timeNow time.Time) { // admission control is enabled. AdmittedWorkDone must be called iff // enabled=true && err!=nil, and the WorkKind for this queue uses slots. func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err error) { - enabledSetting := admissionControlEnabledSettings[q.workKind] - if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) { - return false, nil + if !info.ReplicatedWorkInfo.Enabled { + enabledSetting := admissionControlEnabledSettings[q.workKind] + if enabledSetting != nil && !enabledSetting.Get(&q.settings.SV) { + return false, nil + } } - if info.requestedCount == 0 { - // Callers from outside the admission package don't set requestedCount -- - // these are implicitly requesting a count of 1. - info.requestedCount = 1 + + // TODO(irfansharif): When enabling replication admission control for + // regular writes with arbitrary concurrency (part of #95563), measure + // the memory overhead of enqueueing each raft command to see whether we + // need to do some coalescing at this level. + + if info.RequestedCount == 0 { + // We treat unset RequestCounts as an implicit request of 1. + info.RequestedCount = 1 } - if !q.usesTokens && info.requestedCount != 1 { - panic(errors.AssertionFailedf("unexpected requestedCount: %d", info.requestedCount)) + if !q.usesTokens && info.RequestedCount != 1 { + panic(errors.AssertionFailedf("unexpected RequestedCount: %d", info.RequestedCount)) } q.metrics.incRequested(info.Priority) tenantID := info.TenantID.ToUint64() @@ -518,14 +568,29 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) q.mu.tenants[tenantID] = tenant } + if info.ReplicatedWorkInfo.Enabled { + if info.BypassAdmission { + // TODO(irfansharif): "Admin" work (like splits, scatters, lease + // transfers, etc.), and work originating from AdmissionHeader_OTHER, + // don't use flow control tokens above-raft. So there's nothing to + // virtually enqueue below-raft, since we have nothing to return. That + // said, it might still be useful to physically admit these proposals + // for correct token modeling. To do that, we'd have to pass down + // information about it being bypassed above-raft. + panic("unexpected BypassAdmission bit set for below raft admission") + } + if !q.usesTokens { + panic("unexpected ReplicatedWrite.Enabled on slot-based queue") + } + } if info.BypassAdmission && roachpb.IsSystemTenantID(tenantID) && q.workKind == KVWork { - tenant.used += uint64(info.requestedCount) + tenant.used += uint64(info.RequestedCount) if isInTenantHeap(tenant) { q.mu.tenantHeap.fix(tenant) } q.mu.Unlock() q.admitMu.Unlock() - q.granter.tookWithoutPermission(info.requestedCount) + q.granter.tookWithoutPermission(info.RequestedCount) q.metrics.incAdmitted(info.Priority) return true, nil } @@ -539,11 +604,29 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if len(q.mu.tenantHeap) == 0 { // Fast-path. Try to grab token/slot. // Optimistically update used to avoid locking again. - tenant.used += uint64(info.requestedCount) + tenant.used += uint64(info.RequestedCount) q.mu.Unlock() - if q.granter.tryGet(info.requestedCount) { + if q.granter.tryGet(info.RequestedCount) { q.admitMu.Unlock() q.metrics.incAdmitted(info.Priority) + if info.ReplicatedWorkInfo.Enabled { + // TODO(irfansharif): There's a race here, and could lead to + // over-admission. It's possible that there are enqueued work + // items with lower log positions than the request that just got + // through using the fast-path, and since we're returning flow + // tokens by specifying a log prefix, we'd be returning more + // flow tokens than actually admitted. Fix it as part of #95563, + // by either adding more synchronization, getting rid of this + // fast path, or swapping this entry from the top-most one in + // the waiting heap (and fixing the heap). + q.onAdmittedReplicatedWork.admittedReplicatedWork( + roachpb.MustMakeTenantID(tenant.id), + info.Priority, + info.ReplicatedWorkInfo, + info.RequestedCount, + info.CreateTime, + ) + } return true, nil } // Did not get token/slot. @@ -571,11 +654,11 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if !ok || prevTenant != tenant { panic("prev tenantInfo no longer in map") } - if tenant.used < uint64(info.requestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d", - tenant.used, info.requestedCount)) + if tenant.used < uint64(info.RequestedCount) { + panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", + tenant.used, info.RequestedCount)) } - tenant.used -= uint64(info.requestedCount) + tenant.used -= uint64(info.RequestedCount) } else { if !ok { tenant = newTenantInfo(tenantID, q.getTenantWeightLocked(tenantID)) @@ -583,14 +666,17 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } // Don't want to overflow tenant.used if it is already 0 because of // being reset to 0 by the GC goroutine. - if tenant.used >= uint64(info.requestedCount) { - tenant.used -= uint64(info.requestedCount) + if tenant.used >= uint64(info.RequestedCount) { + tenant.used -= uint64(info.RequestedCount) } } } // Check for cancellation. startTime := q.timeNow() if ctx.Err() != nil { + if info.ReplicatedWorkInfo.Enabled { + panic("not equipped to deal with cancelable contexts below raft") + } // Already canceled. More likely to happen if cpu starvation is // causing entering into the work queue to be delayed. q.mu.Unlock() @@ -606,7 +692,9 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if int(info.Priority) < tenant.fifoPriorityThreshold { ordering = lifoWorkOrdering } - work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.requestedCount, startTime, q.mu.epochLengthNanos) + work := newWaitingWork(info.Priority, ordering, info.CreateTime, info.RequestedCount, startTime, q.mu.epochLengthNanos) + work.replicated = info.ReplicatedWorkInfo + inTenantHeap := isInTenantHeap(tenant) if work.epoch <= q.mu.closedEpochThreshold || ordering == fifoWorkOrdering { heap.Push(&tenant.waitingWorkHeap, work) @@ -618,11 +706,16 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err } // Else already in tenantHeap. - // Release all locks and start waiting. + // Release all locks. q.mu.Unlock() q.admitMu.Unlock() q.metrics.recordStartWait(info.Priority) + if info.ReplicatedWorkInfo.Enabled { + return // return without waiting (admission is asynchronous) + } + + // Start waiting for admission. defer releaseWaitingWork(work) select { case <-ctx.Done(): @@ -638,16 +731,16 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err if work.heapIndex == -1 { // No longer in heap. Raced with token/slot grant. if !q.usesTokens { - if tenant.used < uint64(info.requestedCount) { - panic(errors.AssertionFailedf("tenant.used %d < info.requestedCount %d", - tenant.used, info.requestedCount)) + if tenant.used < uint64(info.RequestedCount) { + panic(errors.AssertionFailedf("tenant.used %d < info.RequestedCount %d", + tenant.used, info.RequestedCount)) } - tenant.used -= uint64(info.requestedCount) + tenant.used -= uint64(info.RequestedCount) } // Else, we don't decrement tenant.used since we don't want to race with // the gc goroutine that will set used=0. q.mu.Unlock() - q.granter.returnGrant(info.requestedCount) + q.granter.returnGrant(info.RequestedCount) // The channel is sent to after releasing mu, so we don't need to hold // mu when receiving from it. Additionally, we've already called // returnGrant so we're not holding back future grant chains if this one @@ -745,8 +838,30 @@ func (q *WorkQueue) granted(grantChainID grantChainID) int64 { // releaseWaitingWork to return item to the waitingWorkPool. requestedCount := item.requestedCount q.mu.Unlock() - // Reduce critical section by sending on channel after releasing mutex. - item.ch <- grantChainID + + if !item.replicated.Enabled { + // Reduce critical section by sending on channel after releasing mutex. + item.ch <- grantChainID + } else { + // NB: We don't use grant chains for store tokens, so they don't apply + // to replicated writes. + + defer releaseWaitingWork(item) + q.onAdmittedReplicatedWork.admittedReplicatedWork( + roachpb.MustMakeTenantID(tenant.id), + item.priority, + item.replicated, + item.requestedCount, + item.createTime, + ) + + q.metrics.incAdmitted(item.priority) + waitDur := q.timeNow().Sub(item.enqueueingTime) + q.metrics.recordFinishWait(item.priority, waitDur) + if item.heapIndex != -1 { + panic(errors.AssertionFailedf("grantee should be removed from heap")) + } + } return requestedCount } @@ -769,24 +884,24 @@ func (q *WorkQueue) gcTenantsAndResetTokens() { } // adjustTenantTokens is used internally by StoreWorkQueue. The -// additionalTokens count can be negative, in which case it is returning +// additionalTokensNeeded count can be negative, in which case it is returning // tokens. This is only for WorkQueue's own accounting -- it should not call // into granter. -func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokens int64) { +func (q *WorkQueue) adjustTenantTokens(tenantID roachpb.TenantID, additionalTokensNeeded int64) { tid := tenantID.ToUint64() q.mu.Lock() defer q.mu.Unlock() tenant, ok := q.mu.tenants[tid] if ok { - if additionalTokens < 0 { - toReturn := uint64(-additionalTokens) + if additionalTokensNeeded < 0 { + toReturn := uint64(-additionalTokensNeeded) if tenant.used < toReturn { tenant.used = 0 } else { tenant.used -= toReturn } } else { - tenant.used += uint64(additionalTokens) + tenant.used += uint64(additionalTokensNeeded) } } } @@ -1243,6 +1358,7 @@ type waitingWork struct { // to false. inWaitingWorkHeap bool enqueueingTime time.Time + replicated ReplicatedWorkInfo } var waitingWorkPool = sync.Pool{ @@ -1636,68 +1752,107 @@ func makeWorkQueueMetricsSingle(name string) workQueueMetricsSingle { // seeking admission from a StoreWorkQueue. type StoreWriteWorkInfo struct { WorkInfo - // NB: no information about the size of the work is provided at admission - // time. The token subtraction at admission time is completely based on past - // estimates. This estimation is improved at work completion time via size - // information provided in StoreWorkDoneInfo. - // - // TODO(sumeer): in some cases, like AddSSTable requests, we do have size - // information at proposal time, and may be able to use it fruitfully. } // StoreWorkQueue is responsible for admission to a store. type StoreWorkQueue struct { - q [admissionpb.NumWorkClasses]WorkQueue + storeID roachpb.StoreID + q [admissionpb.NumWorkClasses]WorkQueue // Only calls storeWriteDone. The rest of the interface is used by // WorkQueue. granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone mu struct { syncutil.RWMutex + // estimates is used to determine how many tokens are deducted at-admit + // time for each request. It's not used for replication admission + // control (below-raft) where we do know the size of the write being + // admitted. estimates storeRequestEstimates - stats storeAdmissionStats + // stats are used to maintain L0 {write,ingest} linear models, modeling + // the relation between accounted for "physical" {write,ingest} bytes + // and observed L0 growth (which factors in state machine application). + stats storeAdmissionStats } + stopCh chan struct{} + timeSource timeutil.TimeSource + settings *cluster.Settings + + knobs *TestingKnobs } // StoreWorkHandle is returned by StoreWorkQueue.Admit, and contains state -// needed by the caller (see StoreWorkHandle.AdmissionEnabled) and by +// needed by the caller (see StoreWorkHandle.UseAdmittedWorkDone) and by // StoreWorkQueue.AdmittedWorkDone. type StoreWorkHandle struct { tenantID roachpb.TenantID // The writeTokens acquired by this request. Must be > 0. - writeTokens int64 - workClass admissionpb.WorkClass - admissionEnabled bool + writeTokens int64 + workClass admissionpb.WorkClass + useAdmittedWorkDone bool } -// AdmissionEnabled indicates whether admission control is enabled. If it -// returns false, there is no need to call StoreWorkQueue.AdmittedWorkDone. -func (h StoreWorkHandle) AdmissionEnabled() bool { - return h.admissionEnabled +// UseAdmittedWorkDone indicates whether we need to invoke +// StoreWorkQueue.AdmittedWorkDone. It's false if AC is disabled or if we're +// using below-raft admission control. +func (h StoreWorkHandle) UseAdmittedWorkDone() bool { + return h.useAdmittedWorkDone } // Admit is called when requesting admission for store work. If err!=nil, the // request was not admitted, potentially due to a deadline being exceeded. If -// err=nil and handle.AdmissionEnabled() is true, AdmittedWorkDone must be +// err=nil and handle.UseAdmittedWorkDone() is true, AdmittedWorkDone must be // called when the admitted work is done. func (q *StoreWorkQueue) Admit( ctx context.Context, info StoreWriteWorkInfo, ) (handle StoreWorkHandle, err error) { - // For now, we compute a workClass based on priority. wc := admissionpb.WorkClassFromPri(info.Priority) - h := StoreWorkHandle{ - tenantID: info.TenantID, - workClass: wc, + if info.RequestedCount == 0 { + // We use a per-request estimate only when no requested count is + // provided. It's always provided for below-raft admission where we know + // the size of the work being admitted. For below-raft admission when + // work is admitted[1], we first deduct the requested number of tokens. + // This just corresponds to the known size of the write/ingest, but + // could be insufficient since we haven't applied the granter's linear + // models. This is accounted for in + // StoreWorkQueue.admittedReplicatedWork(), which is invoked right after + // admission. There is no risk of over-admission since this adjustment + // is being done in the same goroutine that did the granting. + // + // [1]: This happens asynchronously -- i.e. we may have already returned + // from StoreWorkQueue.Admit(). + q.mu.RLock() + info.RequestedCount = q.mu.estimates.writeTokens + q.mu.RUnlock() } - q.mu.RLock() - estimates := q.mu.estimates - q.mu.RUnlock() - h.writeTokens = estimates.writeTokens - info.WorkInfo.requestedCount = h.writeTokens + enabled, err := q.q[wc].Admit(ctx, info.WorkInfo) if err != nil { return StoreWorkHandle{}, err } - h.admissionEnabled = enabled + + h := StoreWorkHandle{ + tenantID: info.TenantID, + workClass: wc, + writeTokens: info.RequestedCount, + useAdmittedWorkDone: enabled, + } + if !info.ReplicatedWorkInfo.Enabled { + return h, nil + } + + h.useAdmittedWorkDone = false + var storeWorkDoneInfo StoreWorkDoneInfo + if info.ReplicatedWorkInfo.Ingested { + storeWorkDoneInfo.IngestedBytes = info.RequestedCount + } else { + storeWorkDoneInfo.WriteBytes = info.RequestedCount + } + + // Update store admission stats, because the write is happening ~this + // point. These statistics are used to maintain the underlying linear + // models (modeling relation between physical log writes and total L0 + // growth, which includes the state machine application). + q.updateStoreStatsAfterWorkDone(1, storeWorkDoneInfo, false) return h, nil } @@ -1713,13 +1868,72 @@ type StoreWorkDoneInfo struct { IngestedBytes int64 } -// AdmittedWorkDone indicates to the queue that the admitted work has -// completed. +type onAdmittedReplicatedWork interface { + admittedReplicatedWork( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + requestedTokens int64, + createTime int64, + ) +} + +var _ onAdmittedReplicatedWork = &StoreWorkQueue{} + +// admittedReplicatedWork indicates to the queue that replicated write work was +// admitted. +func (q *StoreWorkQueue) admittedReplicatedWork( + tenantID roachpb.TenantID, + pri admissionpb.WorkPriority, + rwi ReplicatedWorkInfo, + originalTokens int64, + createTime int64, // only used in tests +) { + if !rwi.Enabled { + panic("unexpected call to admittedReplicatedWork for work that's not a replicated write") + } + if fn := q.knobs.AdmittedReplicatedWorkInterceptor; fn != nil { + fn(tenantID, pri, rwi, originalTokens, createTime) + } + + var storeWorkDoneInfo StoreWorkDoneInfo + if rwi.Ingested { + storeWorkDoneInfo.IngestedBytes = originalTokens + } else { + storeWorkDoneInfo.WriteBytes = originalTokens + } + + // We've already used RequestedCount for replicated writes to deduct tokens + // in the granter. RequestedCount corresponded to the size of the + // write/ingest, which we knew when enqueuing the write in the WorkQueue for + // (asynchronous) admission. That token deduction however did not use the + // underlying linear models, and we may have under-deducted -- we account + // for this below. + wc := admissionpb.WorkClassFromPri(pri) + additionalTokensNeeded := q.granters[wc].storeWriteDone(originalTokens, storeWorkDoneInfo) + q.q[wc].adjustTenantTokens(tenantID, additionalTokensNeeded) + + // TODO(irfansharif): Dispatch flow token returns here. We want to + // inform (a) the origin node of writes at (b) a given priority, to + // (c) the given range, at (d) the given log position on (e) the + // local store. Part of #95563. + // + _ = rwi.Origin // (a) + _ = pri // (b) + _ = rwi.RangeID // (c) + _ = rwi.LogPosition // (d) + _ = q.storeID // (e) +} + +// AdmittedWorkDone indicates to the queue that the admitted work has completed. +// It's used for the legacy above-raft admission control where we Admit() +// upfront, with just an estimate of the write size, and after the write is +// done, invoke AdmittedWorkDone with the now-known size. func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkDoneInfo) error { - if !h.admissionEnabled { - return nil + if !h.UseAdmittedWorkDone() { + return nil // nothing to do } - q.updateStoreAdmissionStats(1, doneInfo, false) + q.updateStoreStatsAfterWorkDone(1, doneInfo, false) additionalTokens := q.granters[h.workClass].storeWriteDone(h.writeTokens, doneInfo) q.q[h.workClass].adjustTenantTokens(h.tenantID, additionalTokens) return nil @@ -1729,7 +1943,7 @@ func (q *StoreWorkQueue) AdmittedWorkDone(h StoreWorkHandle, doneInfo StoreWorkD // can (a) adjust remaining tokens, (b) account for this in the per-work token // estimation model. func (q *StoreWorkQueue) BypassedWorkDone(workCount int64, doneInfo StoreWorkDoneInfo) { - q.updateStoreAdmissionStats(uint64(workCount), doneInfo, true) + q.updateStoreStatsAfterWorkDone(uint64(workCount), doneInfo, true) // Since we have no control over such work, we choose to count it as // regularWorkClass. _ = q.granters[admissionpb.RegularWorkClass].storeWriteDone(0, doneInfo) @@ -1744,11 +1958,11 @@ func (q *StoreWorkQueue) StatsToIgnore(ingestStats pebble.IngestOperationStats) q.mu.Unlock() } -func (q *StoreWorkQueue) updateStoreAdmissionStats( +func (q *StoreWorkQueue) updateStoreStatsAfterWorkDone( workCount uint64, doneInfo StoreWorkDoneInfo, bypassed bool, ) { q.mu.Lock() - q.mu.stats.admittedCount += workCount + q.mu.stats.workCount += workCount q.mu.stats.writeAccountedBytes += uint64(doneInfo.WriteBytes) q.mu.stats.ingestedAccountedBytes += uint64(doneInfo.IngestedBytes) if bypassed { @@ -1779,6 +1993,7 @@ func (q *StoreWorkQueue) close() { for i := range q.q { q.q[i].close() } + close(q.stopCh) } func (q *StoreWorkQueue) getStoreAdmissionStats() storeAdmissionStats { @@ -1795,16 +2010,32 @@ func (q *StoreWorkQueue) setStoreRequestEstimates(estimates storeRequestEstimate func makeStoreWorkQueue( ambientCtx log.AmbientContext, + storeID roachpb.StoreID, granters [admissionpb.NumWorkClasses]granterWithStoreWriteDone, settings *cluster.Settings, metrics *WorkQueueMetrics, opts workQueueOptions, + knobs *TestingKnobs, ) storeRequester { + if knobs == nil { + knobs = &TestingKnobs{} + } + if opts.timeSource == nil { + opts.timeSource = timeutil.DefaultTimeSource{} + } q := &StoreWorkQueue{ - granters: granters, + storeID: storeID, + granters: granters, + knobs: knobs, + stopCh: make(chan struct{}), + timeSource: opts.timeSource, + settings: settings, } + + opts.usesAsyncAdmit = true for i := range q.q { initWorkQueue(&q.q[i], ambientCtx, KVWork, granters[i], settings, metrics, opts) + q.q[i].onAdmittedReplicatedWork = q } // Arbitrary initial value. This will be replaced before any meaningful // token constraints are enforced. diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 0296c442a7ad..f4902930be32 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -57,9 +57,12 @@ func (b *builderWithMu) stringAndReset() string { } type testGranter struct { - name string - buf *builderWithMu - r requester + gk grantKind + name string + buf *builderWithMu + r requester + + // Configurable knobs for tests. returnValueFromTryGet bool additionalTokens int64 } @@ -67,21 +70,26 @@ type testGranter struct { var _ granterWithStoreWriteDone = &testGranter{} func (tg *testGranter) grantKind() grantKind { - return slot + return tg.gk } + func (tg *testGranter) tryGet(count int64) bool { tg.buf.printf("tryGet%s: returning %t", tg.name, tg.returnValueFromTryGet) return tg.returnValueFromTryGet } + func (tg *testGranter) returnGrant(count int64) { tg.buf.printf("returnGrant%s %d", tg.name, count) } + func (tg *testGranter) tookWithoutPermission(count int64) { tg.buf.printf("tookWithoutPermission%s %d", tg.name, count) } + func (tg *testGranter) continueGrantChain(grantChainID grantChainID) { tg.buf.printf("continueGrantChain%s %d", tg.name, grantChainID) } + func (tg *testGranter) grant(grantChainID grantChainID) { rv := tg.r.granted(grantChainID) if rv > 0 { @@ -93,6 +101,7 @@ func (tg *testGranter) grant(grantChainID grantChainID) { } tg.buf.printf("granted%s: returned %d", tg.name, rv) } + func (tg *testGranter) storeWriteDone( originalTokens int64, doneInfo StoreWorkDoneInfo, ) (additionalTokens int64) { @@ -186,7 +195,7 @@ func TestWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg = &testGranter{buf: &buf} + tg = &testGranter{gk: slot, buf: &buf} opts := makeWorkQueueOptions(KVWork) timeSource = timeutil.NewManualTime(initialTime) opts.timeSource = timeSource @@ -329,7 +338,7 @@ func TestWorkQueueTokenResetRace(t *testing.T) { defer log.Scope(t).Close(t) var buf builderWithMu - tg := &testGranter{buf: &buf} + tg := &testGranter{gk: slot, buf: &buf} st := cluster.MakeTestingClusterSettings() registry := metric.NewRegistry() metrics := makeWorkQueueMetrics("", registry) @@ -506,16 +515,16 @@ func TestStoreWorkQueueBasic(t *testing.T) { switch d.Cmd { case "init": closeFn() - tg[admissionpb.RegularWorkClass] = &testGranter{name: " regular", buf: &buf} - tg[admissionpb.ElasticWorkClass] = &testGranter{name: " elastic", buf: &buf} + tg[admissionpb.RegularWorkClass] = &testGranter{gk: token, name: " regular", buf: &buf} + tg[admissionpb.ElasticWorkClass] = &testGranter{gk: token, name: " elastic", buf: &buf} opts := makeWorkQueueOptions(KVWork) opts.usesTokens = true opts.timeSource = timeutil.NewManualTime(timeutil.FromUnixMicros(0)) opts.disableEpochClosingGoroutine = true st = cluster.MakeTestingClusterSettings() - q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), + q = makeStoreWorkQueue(log.MakeTestingAmbientContext(tracing.NewTracer()), roachpb.StoreID(1), [admissionpb.NumWorkClasses]granterWithStoreWriteDone{tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass]}, - st, metrics, opts).(*StoreWorkQueue) + st, metrics, opts, nil /* testing knobs */).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() diff --git a/pkg/util/timeutil/time.go b/pkg/util/timeutil/time.go index 811f63b72bff..281d6b868a0d 100644 --- a/pkg/util/timeutil/time.go +++ b/pkg/util/timeutil/time.go @@ -96,6 +96,13 @@ func FromUnixMicros(us int64) time.Time { return time.Unix(us/1e6, (us%1e6)*1e3).UTC() } +// FromUnixNanos returns the UTC time.Time corresponding to the given Unix +// time, ns nanoseconds since UnixEpoch. In Go's current time.Time +// implementation, all possible values for ns can be represented as a time.Time. +func FromUnixNanos(ns int64) time.Time { + return time.Unix(ns/1e9, ns%1e9).UTC() +} + // ToUnixMicros returns t as the number of microseconds elapsed since UnixEpoch. // Fractional microseconds are rounded, half up, using time.Round. Similar to // time.Time.UnixNano, the result is undefined if the Unix time in microseconds