diff --git a/pkg/kv/kvserver/flow_control_replica_integration_test.go b/pkg/kv/kvserver/flow_control_replica_integration_test.go index ad2bca7a3491..1d2029cb70ad 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration_test.go +++ b/pkg/kv/kvserver/flow_control_replica_integration_test.go @@ -413,9 +413,9 @@ func newMockFlowHandle( func (m *mockFlowHandle) Admit( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, -) error { +) (bool, error) { m.t.Fatal("unimplemented") - return nil + return false, nil } func (m *mockFlowHandle) DeductTokensFor( diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 8ad8dbbafcdc..5c554fcec721 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -308,14 +308,18 @@ func (n *controllerImpl) AdmitKVWork( // to continue even when throttling since there are often significant // number of tokens available. if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { - if !bypassAdmission && + var bypassFlowcontrol bool + attemptFlowControl := !bypassAdmission && kvflowcontrol.Enabled.Get(&n.settings.SV) && - n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) { + n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) + if attemptFlowControl { kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID) if !found { return Handle{}, nil } - if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil { + var err error + bypassFlowcontrol, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)) + if err != nil { return Handle{}, err } // NB: It's possible for us to be waiting for available flow tokens @@ -323,12 +327,19 @@ func (n *controllerImpl) AdmitKVWork( // deduct tokens from, if the range experiences a split between now // and the point of deduction. That's ok, there's no strong // synchronization needed between these two points. - ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ - AdmissionPriority: int32(admissionInfo.Priority), - AdmissionCreateTime: admissionInfo.CreateTime, - AdmissionOriginNode: n.nodeID.Get(), + if !bypassFlowcontrol { + ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionInfo.Priority), + AdmissionCreateTime: admissionInfo.CreateTime, + AdmissionOriginNode: n.nodeID.Get(), + } } - } else { + } + // NB: When flow control is enabled for elastic work, and we encounter + // non-elastic (i.e. "regular work), we skip waiting for tokens in + // kvflowcontrol.Controller.Admit(). In this case, we want to make sure that + // this work still subject to AC. + if !attemptFlowControl || bypassFlowcontrol { storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) if storeAdmissionQ != nil { storeWorkHandle, err := storeAdmissionQ.Admit( diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 7e8c5d9fdb32..ebb97d2b4179 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -60,12 +60,6 @@ const ( // virtually enqueued in below-raft admission queues and dequeued in // priority order, but only empty elastic flow token buckets above-raft will // block further elastic traffic from being admitted. - // - // TODO(irfansharif): We're potentially risking OOMs doing all this tracking - // for regular work, without coalescing state. With a bit of plumbing, for - // requests that bypass flow control we could fallback to using the non-AC - // raft encodings and avoid the potential OOMs. Address this as part of - // #95563. ApplyToElastic ModeT = iota // ApplyToAll uses flow control for both elastic and regular traffic, // i.e. all work will wait for flow tokens to be available. @@ -117,11 +111,11 @@ type Tokens int64 // Controller provides flow control for replication traffic in KV, held at the // node-level. type Controller interface { - // Admit seeks admission to replicate data, regardless of size, for work - // with the given priority, create-time, and over the given stream. This - // blocks until there are flow tokens available or the stream disconnects, - // subject to context cancellation. - Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error + // Admit seeks admission to replicate data, regardless of size, for work with + // the given priority, create-time, and over the given stream. This blocks + // until there are flow tokens available or the stream disconnects, subject to + // context cancellation. It returns true if the request bypassed flow control. + Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (bool, error) // DeductTokens deducts (without blocking) flow tokens for replicating work // with given priority over the given stream. Requests are expected to // have been Admit()-ed first. @@ -158,10 +152,11 @@ type Controller interface { // given priority, takes log position into account -- see // kvflowcontrolpb.AdmittedRaftLogEntries for more details). type Handle interface { - // Admit seeks admission to replicate data, regardless of size, for work - // with the given priority and create-time. This blocks until there are - // flow tokens available for all connected streams. - Admit(context.Context, admissionpb.WorkPriority, time.Time) error + // Admit seeks admission to replicate data, regardless of size, for work with + // the given priority and create-time. This blocks until there are flow tokens + // available for all connected streams. This returns true if the request + // bypassed flow control. + Admit(context.Context, admissionpb.WorkPriority, time.Time) (bool, error) // DeductTokensFor deducts (without blocking) flow tokens for replicating // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index b263edff077b..98d23133c1d4 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -136,7 +136,8 @@ func (c *Controller) Admit( pri admissionpb.WorkPriority, _ time.Time, connection kvflowcontrol.ConnectedStream, -) error { +) (bool, error) { + var bypassTokens bool class := admissionpb.WorkClassFromPri(pri) c.metrics.onWaiting(class) @@ -148,12 +149,11 @@ func (c *Controller) Admit( c.mu.Unlock() tokens := b.tokens(class) - if tokens > 0 || - // In addition to letting requests through when there are tokens - // being available, we'll also let them through if we're not - // applying flow control to their specific work class. - c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass { - + // In addition to letting requests through when there are tokens + // being available, we'll also let them through if we're not + // applying flow control to their specific work class. + bypassTokens = c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass + if tokens > 0 || bypassTokens { if log.ExpensiveLogEnabled(ctx, 2) { log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)", pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode()) @@ -179,7 +179,7 @@ func (c *Controller) Admit( b.signal() // signal a waiter, if any c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) - return nil + return bypassTokens, nil } if !logged && log.ExpensiveLogEnabled(ctx, 2) { @@ -192,12 +192,12 @@ func (c *Controller) Admit( case <-b.wait(): // wait for a signal case <-connection.Disconnected(): c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart)) - return nil + return bypassTokens, nil case <-ctx.Done(): if ctx.Err() != nil { c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart)) } - return ctx.Err() + return bypassTokens, ctx.Err() } } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 21da940bad8b..68579ac7cbd9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -79,7 +79,10 @@ func New( var _ kvflowcontrol.Handle = &Handle{} // Admit is part of the kvflowcontrol.Handle interface. -func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error { +func (h *Handle) Admit( + ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, +) (bool, error) { + var bypassTokens bool if h == nil { // TODO(irfansharif): This can happen if we're proposing immediately on // a newly split off RHS that doesn't know it's a leader yet (so we @@ -92,14 +95,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim // As for cluster settings that disable flow control entirely or only // for regular traffic, that can be dealt with at the caller by not // calling .Admit() and ensuring we use the right raft entry encodings. - return nil + return bypassTokens, nil } h.mu.Lock() if h.mu.closed { h.mu.Unlock() log.Errorf(ctx, "operating on a closed handle") - return nil + return bypassTokens, nil } // NB: We're using a copy-on-write scheme elsewhere to maintain this slice @@ -116,14 +119,16 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim tstart := h.clock.PhysicalTime() for _, c := range connections { - if err := h.controller.Admit(ctx, pri, ct, c); err != nil { + var err error + bypassTokens, err = h.controller.Admit(ctx, pri, ct, c) + if err != nil { h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart)) - return err + return bypassTokens, err } } h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart)) - return nil + return bypassTokens, nil } // DeductTokensFor is part of the kvflowcontrol.Handle interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go index 69878f5adb15..dca22334c424 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go @@ -26,8 +26,10 @@ type Noop struct{} var _ kvflowcontrol.Handle = Noop{} // Admit is part of the kvflowcontrol.Handle interface. -func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error { - return nil +func (n Noop) Admit( + ctx context.Context, priority admissionpb.WorkPriority, time time.Time, +) (bool, error) { + return false, nil } // DeductTokensFor is part of the kvflowcontrol.Handle interface. diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 9b3503d17d7d..2f58569658b1 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{} func (t *testFlowTokenHandle) Admit( ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time, -) error { - return nil +) (bool, error) { + return false, nil } func (t *testFlowTokenHandle) DeductTokensFor(