Skip to content

Commit

Permalink
kvflowdispatch: move down dispatch mutex to reduce contention
Browse files Browse the repository at this point in the history
In `kv0/enc=false/nodes=3/cpu=96`, we noticed mutex contention around
the `outbox` map. This patch tries to alleviate that by moving the mutex
down into each individual dispatch map (sharding by NodeID).

Informs: #104154.

Release note: None
  • Loading branch information
aadityasondhi committed Oct 3, 2023
1 parent 119fb63 commit a30f9eb
Showing 1 changed file with 76 additions and 42 deletions.
118 changes: 76 additions & 42 deletions pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvflowdispatch

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
Expand All @@ -32,13 +33,9 @@ const AdmittedRaftLogEntriesBytes = 50
// entries to specific nodes, and (ii) to read pending dispatches.
type Dispatch struct {
mu struct {
// TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex
// is responsible for ~3.7% of the mutex contention. Look to address it
// as part of #104154. Perhaps shard this mutex by node ID? Or use a
// sync.Map instead?
syncutil.Mutex
// outbox maintains pending dispatches on a per-node basis.
outbox map[roachpb.NodeID]dispatches
outbox sync.Map // roachpb.NodeID -> *dispatches
}
metrics *metrics
// handles is used to dispatch tokens locally. Remote token dispatches are
Expand All @@ -57,7 +54,12 @@ type dispatchKey struct {
admissionpb.WorkPriority
}

type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition
type dispatches struct {
mu struct {
syncutil.Mutex
items map[dispatchKey]kvflowcontrolpb.RaftLogPosition
}
}

var _ kvflowcontrol.Dispatch = &Dispatch{}

Expand All @@ -69,7 +71,7 @@ func New(
handles: handles,
nodeID: nodeID,
}
d.mu.outbox = make(map[roachpb.NodeID]dispatches)
d.mu.outbox = sync.Map{}
d.metrics = newMetrics()
registry.AddMetricStruct(d.metrics)
return d
Expand Down Expand Up @@ -101,67 +103,84 @@ func (d *Dispatch) Dispatch(
d.metrics.LocalDispatch[wc].Inc(1)
return
}
d.metrics.RemoteDispatch[wc].Inc(1)

d.mu.Lock()
defer d.mu.Unlock()
d.metrics.RemoteDispatch[wc].Inc(1)
dispatchMap := d.getDispatchMap(nodeID)
var existing kvflowcontrolpb.RaftLogPosition
var found bool
func() {
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()

if len(dispatchMap.mu.items) == 0 {
d.metrics.PendingNodes.Inc(1)
}

if _, ok := d.mu.outbox[nodeID]; !ok {
d.mu.outbox[nodeID] = dispatches{}
d.metrics.PendingNodes.Inc(1)
}
dk := dispatchKey{
entries.RangeID,
entries.StoreID,
pri,
}

dk := dispatchKey{
entries.RangeID,
entries.StoreID,
pri,
}
existing, found = dispatchMap.mu.items[dk]

existing, found := d.mu.outbox[nodeID][dk]
if !found || existing.Less(entries.UpToRaftLogPosition) {
d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition
if !found || existing.Less(entries.UpToRaftLogPosition) {
dispatchMap.mu.items[dk] = entries.UpToRaftLogPosition
}
}()

if !found {
d.metrics.PendingDispatches[wc].Inc(1)
} else {
if found {
if existing.Less(entries.UpToRaftLogPosition) {
// We're replacing an existing dispatch with one with a higher log
// position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
} else {
// We're dropping a dispatch given we already have a pending one with a
// higher log position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
}
}
if found && !existing.Less(entries.UpToRaftLogPosition) {
// We're dropping a dispatch given we already have a pending one with a
// higher log position. Increment the coalesced metric.
d.metrics.CoalescedDispatches[wc].Inc(1)
} else {
d.metrics.PendingDispatches[wc].Inc(1)
}
}

// PendingDispatch is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatch() []roachpb.NodeID {
// NB: We're holding the Dispatch mutex here, which guards against new buckets
// being added, synchronization we don't get out of sync.Map.Range() directly.
d.mu.Lock()
defer d.mu.Unlock()

nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox))
for node := range d.mu.outbox {
nodes = append(nodes, node)
}
var nodes []roachpb.NodeID
d.mu.outbox.Range(func(key, value any) bool {
dispatchMap := value.(*dispatches)
node := key.(roachpb.NodeID)
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()
if len(dispatchMap.mu.items) > 0 {
nodes = append(nodes, node)
}
return true
})
return nodes
}

// PendingDispatchFor is part of the kvflowcontrol.Dispatch interface.
func (d *Dispatch) PendingDispatchFor(
nodeID roachpb.NodeID, maxBytes int64,
) ([]kvflowcontrolpb.AdmittedRaftLogEntries, int) {
d.mu.Lock()
defer d.mu.Unlock()
dispatchMap := d.getDispatchMap(nodeID)

if _, ok := d.mu.outbox[nodeID]; !ok {
dispatchMap.mu.Lock()
defer dispatchMap.mu.Unlock()

if len(dispatchMap.mu.items) == 0 {
return nil, 0
}

var entries []kvflowcontrolpb.AdmittedRaftLogEntries
maxEntries := maxBytes / AdmittedRaftLogEntriesBytes
for key, dispatch := range d.mu.outbox[nodeID] {
for key, dispatch := range dispatchMap.mu.items {
if maxEntries == 0 {
break
}
Expand All @@ -178,18 +197,33 @@ func (d *Dispatch) PendingDispatchFor(
wc := admissionpb.WorkClassFromPri(key.WorkPriority)
d.metrics.PendingDispatches[wc].Dec(1)
maxEntries -= 1
delete(d.mu.outbox[nodeID], key)
delete(dispatchMap.mu.items, key)
}

remainingDispatches := len(d.mu.outbox[nodeID])
remainingDispatches := len(dispatchMap.mu.items)
if remainingDispatches == 0 {
delete(d.mu.outbox, nodeID)
d.metrics.PendingNodes.Dec(1)
}

return entries, remainingDispatches
}

func (d *Dispatch) getDispatchMap(nodeID roachpb.NodeID) *dispatches {
// We hold the d.mu.Lock every time we add to d.mu.outbox to avoid a race
// condition in initializing dispatchMap.mu.items and reading from it after
// returning from this function.
dispatchMap, ok := d.mu.outbox.Load(nodeID)
if !ok {
d.mu.Lock()
defer d.mu.Unlock()
var loaded bool
dispatchMap, loaded = d.mu.outbox.LoadOrStore(nodeID, &dispatches{})
if !loaded {
dispatchMap.(*dispatches).mu.items = make(map[dispatchKey]kvflowcontrolpb.RaftLogPosition)
}
}
return dispatchMap.(*dispatches)
}

// testingMetrics returns the underlying metrics struct for testing purposes.
func (d *Dispatch) testingMetrics() *metrics {
return d.metrics
Expand Down

0 comments on commit a30f9eb

Please sign in to comment.