From c04bbd92cd9200ff3fb7f7907b9dc3f55e209ec9 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Thu, 24 Aug 2023 13:29:57 -0400 Subject: [PATCH] kvadmission: fix handling of non-elastic work with flow control Previously, when we were skipping flow control for regular work, we were still encoding raft messages with AC encoding. This would enqueue essentially no-op AC work items below raft, increasing the possibility of OOM. This change skips encoding Raft messages with AC encoding for cases where we skip flow control. Additionally, we still subject these work items to above Raft AC if it is enabled. informs #104154 Release note: None --- .../flow_control_replica_integration_test.go | 4 +-- pkg/kv/kvserver/kvadmission/kvadmission.go | 27 +++++++++++++------ .../kvserver/kvflowcontrol/kvflowcontrol.go | 25 +++++++---------- .../kvflowcontroller/kvflowcontroller.go | 20 +++++++------- .../kvflowhandle/kvflowhandle.go | 17 +++++++----- .../kvflowcontrol/kvflowhandle/noop.go | 6 +++-- pkg/kv/kvserver/replica_proposal_buf_test.go | 4 +-- 7 files changed, 58 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/flow_control_replica_integration_test.go b/pkg/kv/kvserver/flow_control_replica_integration_test.go index ad2bca7a3491..1d2029cb70ad 100644 --- a/pkg/kv/kvserver/flow_control_replica_integration_test.go +++ b/pkg/kv/kvserver/flow_control_replica_integration_test.go @@ -413,9 +413,9 @@ func newMockFlowHandle( func (m *mockFlowHandle) Admit( ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, -) error { +) (bool, error) { m.t.Fatal("unimplemented") - return nil + return false, nil } func (m *mockFlowHandle) DeductTokensFor( diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 8ad8dbbafcdc..5c554fcec721 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -308,14 +308,18 @@ func (n *controllerImpl) AdmitKVWork( // to continue even when throttling since there are often significant // number of tokens available. if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { - if !bypassAdmission && + var bypassFlowcontrol bool + attemptFlowControl := !bypassAdmission && kvflowcontrol.Enabled.Get(&n.settings.SV) && - n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) { + n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) + if attemptFlowControl { kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID) if !found { return Handle{}, nil } - if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil { + var err error + bypassFlowcontrol, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)) + if err != nil { return Handle{}, err } // NB: It's possible for us to be waiting for available flow tokens @@ -323,12 +327,19 @@ func (n *controllerImpl) AdmitKVWork( // deduct tokens from, if the range experiences a split between now // and the point of deduction. That's ok, there's no strong // synchronization needed between these two points. - ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ - AdmissionPriority: int32(admissionInfo.Priority), - AdmissionCreateTime: admissionInfo.CreateTime, - AdmissionOriginNode: n.nodeID.Get(), + if !bypassFlowcontrol { + ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionInfo.Priority), + AdmissionCreateTime: admissionInfo.CreateTime, + AdmissionOriginNode: n.nodeID.Get(), + } } - } else { + } + // NB: When flow control is enabled for elastic work, and we encounter + // non-elastic (i.e. "regular work), we skip waiting for tokens in + // kvflowcontrol.Controller.Admit(). In this case, we want to make sure that + // this work still subject to AC. + if !attemptFlowControl || bypassFlowcontrol { storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) if storeAdmissionQ != nil { storeWorkHandle, err := storeAdmissionQ.Admit( diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 7e8c5d9fdb32..ebb97d2b4179 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -60,12 +60,6 @@ const ( // virtually enqueued in below-raft admission queues and dequeued in // priority order, but only empty elastic flow token buckets above-raft will // block further elastic traffic from being admitted. - // - // TODO(irfansharif): We're potentially risking OOMs doing all this tracking - // for regular work, without coalescing state. With a bit of plumbing, for - // requests that bypass flow control we could fallback to using the non-AC - // raft encodings and avoid the potential OOMs. Address this as part of - // #95563. ApplyToElastic ModeT = iota // ApplyToAll uses flow control for both elastic and regular traffic, // i.e. all work will wait for flow tokens to be available. @@ -117,11 +111,11 @@ type Tokens int64 // Controller provides flow control for replication traffic in KV, held at the // node-level. type Controller interface { - // Admit seeks admission to replicate data, regardless of size, for work - // with the given priority, create-time, and over the given stream. This - // blocks until there are flow tokens available or the stream disconnects, - // subject to context cancellation. - Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error + // Admit seeks admission to replicate data, regardless of size, for work with + // the given priority, create-time, and over the given stream. This blocks + // until there are flow tokens available or the stream disconnects, subject to + // context cancellation. It returns true if the request bypassed flow control. + Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (bool, error) // DeductTokens deducts (without blocking) flow tokens for replicating work // with given priority over the given stream. Requests are expected to // have been Admit()-ed first. @@ -158,10 +152,11 @@ type Controller interface { // given priority, takes log position into account -- see // kvflowcontrolpb.AdmittedRaftLogEntries for more details). type Handle interface { - // Admit seeks admission to replicate data, regardless of size, for work - // with the given priority and create-time. This blocks until there are - // flow tokens available for all connected streams. - Admit(context.Context, admissionpb.WorkPriority, time.Time) error + // Admit seeks admission to replicate data, regardless of size, for work with + // the given priority and create-time. This blocks until there are flow tokens + // available for all connected streams. This returns true if the request + // bypassed flow control. + Admit(context.Context, admissionpb.WorkPriority, time.Time) (bool, error) // DeductTokensFor deducts (without blocking) flow tokens for replicating // work with given priority along connected streams. The deduction is // tracked with respect to the specific raft log position it's expecting it diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index b263edff077b..98d23133c1d4 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -136,7 +136,8 @@ func (c *Controller) Admit( pri admissionpb.WorkPriority, _ time.Time, connection kvflowcontrol.ConnectedStream, -) error { +) (bool, error) { + var bypassTokens bool class := admissionpb.WorkClassFromPri(pri) c.metrics.onWaiting(class) @@ -148,12 +149,11 @@ func (c *Controller) Admit( c.mu.Unlock() tokens := b.tokens(class) - if tokens > 0 || - // In addition to letting requests through when there are tokens - // being available, we'll also let them through if we're not - // applying flow control to their specific work class. - c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass { - + // In addition to letting requests through when there are tokens + // being available, we'll also let them through if we're not + // applying flow control to their specific work class. + bypassTokens = c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass + if tokens > 0 || bypassTokens { if log.ExpensiveLogEnabled(ctx, 2) { log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)", pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode()) @@ -179,7 +179,7 @@ func (c *Controller) Admit( b.signal() // signal a waiter, if any c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart)) - return nil + return bypassTokens, nil } if !logged && log.ExpensiveLogEnabled(ctx, 2) { @@ -192,12 +192,12 @@ func (c *Controller) Admit( case <-b.wait(): // wait for a signal case <-connection.Disconnected(): c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart)) - return nil + return bypassTokens, nil case <-ctx.Done(): if ctx.Err() != nil { c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart)) } - return ctx.Err() + return bypassTokens, ctx.Err() } } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 21da940bad8b..68579ac7cbd9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -79,7 +79,10 @@ func New( var _ kvflowcontrol.Handle = &Handle{} // Admit is part of the kvflowcontrol.Handle interface. -func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error { +func (h *Handle) Admit( + ctx context.Context, pri admissionpb.WorkPriority, ct time.Time, +) (bool, error) { + var bypassTokens bool if h == nil { // TODO(irfansharif): This can happen if we're proposing immediately on // a newly split off RHS that doesn't know it's a leader yet (so we @@ -92,14 +95,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim // As for cluster settings that disable flow control entirely or only // for regular traffic, that can be dealt with at the caller by not // calling .Admit() and ensuring we use the right raft entry encodings. - return nil + return bypassTokens, nil } h.mu.Lock() if h.mu.closed { h.mu.Unlock() log.Errorf(ctx, "operating on a closed handle") - return nil + return bypassTokens, nil } // NB: We're using a copy-on-write scheme elsewhere to maintain this slice @@ -116,14 +119,16 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim tstart := h.clock.PhysicalTime() for _, c := range connections { - if err := h.controller.Admit(ctx, pri, ct, c); err != nil { + var err error + bypassTokens, err = h.controller.Admit(ctx, pri, ct, c) + if err != nil { h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart)) - return err + return bypassTokens, err } } h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart)) - return nil + return bypassTokens, nil } // DeductTokensFor is part of the kvflowcontrol.Handle interface. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go index 69878f5adb15..dca22334c424 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go @@ -26,8 +26,10 @@ type Noop struct{} var _ kvflowcontrol.Handle = Noop{} // Admit is part of the kvflowcontrol.Handle interface. -func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error { - return nil +func (n Noop) Admit( + ctx context.Context, priority admissionpb.WorkPriority, time time.Time, +) (bool, error) { + return false, nil } // DeductTokensFor is part of the kvflowcontrol.Handle interface. diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 9b3503d17d7d..2f58569658b1 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{} func (t *testFlowTokenHandle) Admit( ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time, -) error { - return nil +) (bool, error) { + return false, nil } func (t *testFlowTokenHandle) DeductTokensFor(