diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go index 6b476cc73acb..1af808641b92 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch/kvflowdispatch.go @@ -12,6 +12,7 @@ package kvflowdispatch import ( "context" + "sync" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" @@ -32,13 +33,9 @@ const AdmittedRaftLogEntriesBytes = 50 // entries to specific nodes, and (ii) to read pending dispatches. type Dispatch struct { mu struct { - // TODO(irfansharif,aaditya): On kv0/enc=false/nodes=3/cpu=96 this mutex - // is responsible for ~3.7% of the mutex contention. Look to address it - // as part of #104154. Perhaps shard this mutex by node ID? Or use a - // sync.Map instead? syncutil.Mutex // outbox maintains pending dispatches on a per-node basis. - outbox map[roachpb.NodeID]dispatches + outbox sync.Map // roachpb.NodeID -> *dispatches } metrics *metrics // handles is used to dispatch tokens locally. Remote token dispatches are @@ -57,7 +54,12 @@ type dispatchKey struct { admissionpb.WorkPriority } -type dispatches map[dispatchKey]kvflowcontrolpb.RaftLogPosition +type dispatches struct { + mu struct { + syncutil.Mutex + items map[dispatchKey]kvflowcontrolpb.RaftLogPosition + } +} var _ kvflowcontrol.Dispatch = &Dispatch{} @@ -69,7 +71,7 @@ func New( handles: handles, nodeID: nodeID, } - d.mu.outbox = make(map[roachpb.NodeID]dispatches) + d.mu.outbox = sync.Map{} d.metrics = newMetrics() registry.AddMetricStruct(d.metrics) return d @@ -101,50 +103,65 @@ func (d *Dispatch) Dispatch( d.metrics.LocalDispatch[wc].Inc(1) return } - d.metrics.RemoteDispatch[wc].Inc(1) - d.mu.Lock() - defer d.mu.Unlock() + d.metrics.RemoteDispatch[wc].Inc(1) + dispatchMap := d.getDispatchMap(nodeID) + var existing kvflowcontrolpb.RaftLogPosition + var found bool + func() { + dispatchMap.mu.Lock() + defer dispatchMap.mu.Unlock() + + if len(dispatchMap.mu.items) == 0 { + d.metrics.PendingNodes.Inc(1) + } - if _, ok := d.mu.outbox[nodeID]; !ok { - d.mu.outbox[nodeID] = dispatches{} - d.metrics.PendingNodes.Inc(1) - } + dk := dispatchKey{ + entries.RangeID, + entries.StoreID, + pri, + } - dk := dispatchKey{ - entries.RangeID, - entries.StoreID, - pri, - } + existing, found = dispatchMap.mu.items[dk] - existing, found := d.mu.outbox[nodeID][dk] - if !found || existing.Less(entries.UpToRaftLogPosition) { - d.mu.outbox[nodeID][dk] = entries.UpToRaftLogPosition + if !found || existing.Less(entries.UpToRaftLogPosition) { + dispatchMap.mu.items[dk] = entries.UpToRaftLogPosition + } + }() - if !found { - d.metrics.PendingDispatches[wc].Inc(1) - } else { + if found { + if existing.Less(entries.UpToRaftLogPosition) { // We're replacing an existing dispatch with one with a higher log // position. Increment the coalesced metric. d.metrics.CoalescedDispatches[wc].Inc(1) + } else { + // We're dropping a dispatch given we already have a pending one with a + // higher log position. Increment the coalesced metric. + d.metrics.CoalescedDispatches[wc].Inc(1) } - } - if found && !existing.Less(entries.UpToRaftLogPosition) { - // We're dropping a dispatch given we already have a pending one with a - // higher log position. Increment the coalesced metric. - d.metrics.CoalescedDispatches[wc].Inc(1) + } else { + d.metrics.PendingDispatches[wc].Inc(1) } } // PendingDispatch is part of the kvflowcontrol.Dispatch interface. func (d *Dispatch) PendingDispatch() []roachpb.NodeID { + // NB: We're holding the Dispatch mutex here, which guards against new buckets + // being added, synchronization we don't get out of sync.Map.Range() directly. d.mu.Lock() defer d.mu.Unlock() - nodes := make([]roachpb.NodeID, 0, len(d.mu.outbox)) - for node := range d.mu.outbox { - nodes = append(nodes, node) - } + var nodes []roachpb.NodeID + d.mu.outbox.Range(func(key, value any) bool { + dispatchMap := value.(*dispatches) + node := key.(roachpb.NodeID) + dispatchMap.mu.Lock() + defer dispatchMap.mu.Unlock() + if len(dispatchMap.mu.items) > 0 { + nodes = append(nodes, node) + } + return true + }) return nodes } @@ -152,16 +169,18 @@ func (d *Dispatch) PendingDispatch() []roachpb.NodeID { func (d *Dispatch) PendingDispatchFor( nodeID roachpb.NodeID, maxBytes int64, ) ([]kvflowcontrolpb.AdmittedRaftLogEntries, int) { - d.mu.Lock() - defer d.mu.Unlock() + dispatchMap := d.getDispatchMap(nodeID) - if _, ok := d.mu.outbox[nodeID]; !ok { + dispatchMap.mu.Lock() + defer dispatchMap.mu.Unlock() + + if len(dispatchMap.mu.items) == 0 { return nil, 0 } var entries []kvflowcontrolpb.AdmittedRaftLogEntries maxEntries := maxBytes / AdmittedRaftLogEntriesBytes - for key, dispatch := range d.mu.outbox[nodeID] { + for key, dispatch := range dispatchMap.mu.items { if maxEntries == 0 { break } @@ -178,18 +197,33 @@ func (d *Dispatch) PendingDispatchFor( wc := admissionpb.WorkClassFromPri(key.WorkPriority) d.metrics.PendingDispatches[wc].Dec(1) maxEntries -= 1 - delete(d.mu.outbox[nodeID], key) + delete(dispatchMap.mu.items, key) } - remainingDispatches := len(d.mu.outbox[nodeID]) + remainingDispatches := len(dispatchMap.mu.items) if remainingDispatches == 0 { - delete(d.mu.outbox, nodeID) d.metrics.PendingNodes.Dec(1) } - return entries, remainingDispatches } +func (d *Dispatch) getDispatchMap(nodeID roachpb.NodeID) *dispatches { + // We hold the d.mu.Lock every time we add to d.mu.outbox to avoid a race + // condition in initializing dispatchMap.mu.items and reading from it after + // returning from this function. + dispatchMap, ok := d.mu.outbox.Load(nodeID) + if !ok { + d.mu.Lock() + defer d.mu.Unlock() + var loaded bool + dispatchMap, loaded = d.mu.outbox.LoadOrStore(nodeID, &dispatches{}) + if !loaded { + dispatchMap.(*dispatches).mu.items = make(map[dispatchKey]kvflowcontrolpb.RaftLogPosition) + } + } + return dispatchMap.(*dispatches) +} + // testingMetrics returns the underlying metrics struct for testing purposes. func (d *Dispatch) testingMetrics() *metrics { return d.metrics