Skip to content

Commit

Permalink
kvadmission: fix handling of non-elastic work with flow control
Browse files Browse the repository at this point in the history
Previously, when we were skipping flow control for regular work, we were
still encoding raft messages with AC encoding. This would enqueue
essentially no-op AC work items below raft, increasing the possibility
of OOM.

This change skips encoding Raft messages with AC encoding for cases
where we skip flow control. Additionally, we still subject these work
items to above Raft AC if it is enabled.

informs cockroachdb#104154

Release note: None
  • Loading branch information
aadityasondhi committed Aug 24, 2023
1 parent 1b933fa commit c04bbd9
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/flow_control_replica_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,9 @@ func newMockFlowHandle(

func (m *mockFlowHandle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) error {
) (bool, error) {
m.t.Fatal("unimplemented")
return nil
return false, nil
}

func (m *mockFlowHandle) DeductTokensFor(
Expand Down
27 changes: 19 additions & 8 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,27 +308,38 @@ func (n *controllerImpl) AdmitKVWork(
// to continue even when throttling since there are often significant
// number of tokens available.
if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() {
if !bypassAdmission &&
var bypassFlowcontrol bool
attemptFlowControl := !bypassAdmission &&
kvflowcontrol.Enabled.Get(&n.settings.SV) &&
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) {
n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings)
if attemptFlowControl {
kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID)
if !found {
return Handle{}, nil
}
if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil {
var err error
bypassFlowcontrol, err = kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime))
if err != nil {
return Handle{}, err
}
// NB: It's possible for us to be waiting for available flow tokens
// for a different set of streams that the ones we'll eventually
// deduct tokens from, if the range experiences a split between now
// and the point of deduction. That's ok, there's no strong
// synchronization needed between these two points.
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
if !bypassFlowcontrol {
ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{
AdmissionPriority: int32(admissionInfo.Priority),
AdmissionCreateTime: admissionInfo.CreateTime,
AdmissionOriginNode: n.nodeID.Get(),
}
}
} else {
}
// NB: When flow control is enabled for elastic work, and we encounter
// non-elastic (i.e. "regular work), we skip waiting for tokens in
// kvflowcontrol.Controller.Admit(). In this case, we want to make sure that
// this work still subject to AC.
if !attemptFlowControl || bypassFlowcontrol {
storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID))
if storeAdmissionQ != nil {
storeWorkHandle, err := storeAdmissionQ.Admit(
Expand Down
25 changes: 10 additions & 15 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ const (
// virtually enqueued in below-raft admission queues and dequeued in
// priority order, but only empty elastic flow token buckets above-raft will
// block further elastic traffic from being admitted.
//
// TODO(irfansharif): We're potentially risking OOMs doing all this tracking
// for regular work, without coalescing state. With a bit of plumbing, for
// requests that bypass flow control we could fallback to using the non-AC
// raft encodings and avoid the potential OOMs. Address this as part of
// #95563.
ApplyToElastic ModeT = iota
// ApplyToAll uses flow control for both elastic and regular traffic,
// i.e. all work will wait for flow tokens to be available.
Expand Down Expand Up @@ -117,11 +111,11 @@ type Tokens int64
// Controller provides flow control for replication traffic in KV, held at the
// node-level.
type Controller interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority, create-time, and over the given stream. This
// blocks until there are flow tokens available or the stream disconnects,
// subject to context cancellation.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority, create-time, and over the given stream. This blocks
// until there are flow tokens available or the stream disconnects, subject to
// context cancellation. It returns true if the request bypassed flow control.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) (bool, error)
// DeductTokens deducts (without blocking) flow tokens for replicating work
// with given priority over the given stream. Requests are expected to
// have been Admit()-ed first.
Expand Down Expand Up @@ -158,10 +152,11 @@ type Controller interface {
// given priority, takes log position into account -- see
// kvflowcontrolpb.AdmittedRaftLogEntries for more details).
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are
// flow tokens available for all connected streams.
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
// Admit seeks admission to replicate data, regardless of size, for work with
// the given priority and create-time. This blocks until there are flow tokens
// available for all connected streams. This returns true if the request
// bypassed flow control.
Admit(context.Context, admissionpb.WorkPriority, time.Time) (bool, error)
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (c *Controller) Admit(
pri admissionpb.WorkPriority,
_ time.Time,
connection kvflowcontrol.ConnectedStream,
) error {
) (bool, error) {
var bypassTokens bool
class := admissionpb.WorkClassFromPri(pri)
c.metrics.onWaiting(class)

Expand All @@ -148,12 +149,11 @@ func (c *Controller) Admit(
c.mu.Unlock()

tokens := b.tokens(class)
if tokens > 0 ||
// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
// applying flow control to their specific work class.
c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass {

// In addition to letting requests through when there are tokens
// being available, we'll also let them through if we're not
// applying flow control to their specific work class.
bypassTokens = c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass
if tokens > 0 || bypassTokens {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)",
pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode())
Expand All @@ -179,7 +179,7 @@ func (c *Controller) Admit(

b.signal() // signal a waiter, if any
c.metrics.onAdmitted(class, c.clock.PhysicalTime().Sub(tstart))
return nil
return bypassTokens, nil
}

if !logged && log.ExpensiveLogEnabled(ctx, 2) {
Expand All @@ -192,12 +192,12 @@ func (c *Controller) Admit(
case <-b.wait(): // wait for a signal
case <-connection.Disconnected():
c.metrics.onBypassed(class, c.clock.PhysicalTime().Sub(tstart))
return nil
return bypassTokens, nil
case <-ctx.Done():
if ctx.Err() != nil {
c.metrics.onErrored(class, c.clock.PhysicalTime().Sub(tstart))
}
return ctx.Err()
return bypassTokens, ctx.Err()
}
}

Expand Down
17 changes: 11 additions & 6 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func New(
var _ kvflowcontrol.Handle = &Handle{}

// Admit is part of the kvflowcontrol.Handle interface.
func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct time.Time) error {
func (h *Handle) Admit(
ctx context.Context, pri admissionpb.WorkPriority, ct time.Time,
) (bool, error) {
var bypassTokens bool
if h == nil {
// TODO(irfansharif): This can happen if we're proposing immediately on
// a newly split off RHS that doesn't know it's a leader yet (so we
Expand All @@ -92,14 +95,14 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
// As for cluster settings that disable flow control entirely or only
// for regular traffic, that can be dealt with at the caller by not
// calling .Admit() and ensuring we use the right raft entry encodings.
return nil
return bypassTokens, nil
}

h.mu.Lock()
if h.mu.closed {
h.mu.Unlock()
log.Errorf(ctx, "operating on a closed handle")
return nil
return bypassTokens, nil
}

// NB: We're using a copy-on-write scheme elsewhere to maintain this slice
Expand All @@ -116,14 +119,16 @@ func (h *Handle) Admit(ctx context.Context, pri admissionpb.WorkPriority, ct tim
tstart := h.clock.PhysicalTime()

for _, c := range connections {
if err := h.controller.Admit(ctx, pri, ct, c); err != nil {
var err error
bypassTokens, err = h.controller.Admit(ctx, pri, ct, c)
if err != nil {
h.metrics.onErrored(class, h.clock.PhysicalTime().Sub(tstart))
return err
return bypassTokens, err
}
}

h.metrics.onAdmitted(class, h.clock.PhysicalTime().Sub(tstart))
return nil
return bypassTokens, nil
}

// DeductTokensFor is part of the kvflowcontrol.Handle interface.
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type Noop struct{}
var _ kvflowcontrol.Handle = Noop{}

// Admit is part of the kvflowcontrol.Handle interface.
func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error {
return nil
func (n Noop) Admit(
ctx context.Context, priority admissionpb.WorkPriority, time time.Time,
) (bool, error) {
return false, nil
}

// DeductTokensFor is part of the kvflowcontrol.Handle interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,8 +1166,8 @@ var _ kvflowcontrol.Handle = &testFlowTokenHandle{}

func (t *testFlowTokenHandle) Admit(
ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time,
) error {
return nil
) (bool, error) {
return false, nil
}

func (t *testFlowTokenHandle) DeductTokensFor(
Expand Down

0 comments on commit c04bbd9

Please sign in to comment.