Skip to content

Commit

Permalink
storage: use RWMutex in NodeLiveness
Browse files Browse the repository at this point in the history
When at Mutex profiles for heavily loaded TPC-C clusters we noticed that a lot
of time was being spent blocked on a RWMutex held by Replica.leaseGoodToGo which
underneath was reading NodeLiveness state in a read-only way. This PR adds a
RWMutex to NodeLiveness to eliminate contention. Prior to this change we
observed nearly 60% of lock contention on leaseGoodToGo. After we observe
closer to 20%.

Release note: None
  • Loading branch information
ajwerner committed Apr 2, 2019
1 parent a17d619 commit d4e9c9c
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions pkg/storage/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type NodeLiveness struct {
metrics LivenessMetrics

mu struct {
syncutil.Mutex
syncutil.RWMutex
callbacks []IsLiveCallback
nodes map[roachpb.NodeID]storagepb.Liveness
heartbeatCallback HeartbeatCallback
Expand Down Expand Up @@ -412,9 +412,9 @@ func (nl *NodeLiveness) StartHeartbeat(
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()

nl.mu.Lock()
nl.mu.RLock()
nl.mu.heartbeatCallback = alive
nl.mu.Unlock()
nl.mu.RUnlock()

stopper.RunWorker(ctx, func(context.Context) {
ambient := nl.ambientCtx
Expand Down Expand Up @@ -595,8 +595,8 @@ func (nl *NodeLiveness) heartbeatInternal(
// liveness record successfully, nor received a gossip message containing
// a former liveness update on restart.
func (nl *NodeLiveness) Self() (*storagepb.Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nl.gossip.NodeID.Get())
}

Expand All @@ -614,9 +614,9 @@ type IsLiveMap map[roachpb.NodeID]IsLiveMapEntry
// each node. This excludes nodes that were removed completely (dead +
// decommissioning).
func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
nl.mu.Lock()
defer nl.mu.Unlock()
lMap := IsLiveMap{}
nl.mu.RLock()
defer nl.mu.RUnlock()
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()
for nID, l := range nl.mu.nodes {
Expand All @@ -637,8 +637,8 @@ func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
// every node on the cluster known to gossip. Callers should consider
// calling (statusServer).NodesWithLiveness() instead where possible.
func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
livenesses := make([]storagepb.Liveness, 0, len(nl.mu.nodes))
for _, l := range nl.mu.nodes {
livenesses = append(livenesses, l)
Expand All @@ -650,8 +650,8 @@ func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness {
// ErrNoLivenessRecord is returned in the event that nothing is yet
// known about nodeID via liveness gossip.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (*storagepb.Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nodeID)
}

Expand Down Expand Up @@ -854,21 +854,30 @@ func (nl *NodeLiveness) updateLivenessAttempt(
// maybeUpdate replaces the liveness (if it appears newer) and invokes the
// registered callbacks if the node became live in the process.
func (nl *NodeLiveness) maybeUpdate(new storagepb.Liveness) {
nl.mu.Lock()
// Note that this works fine even if `old` is empty.
// Optimistically check if liveness needs an update.
nl.mu.RLock()
// Read the number of callbacks to enable optimistically allocating the
// callbacks slice outside of the mutex.
numCallbacks := len(nl.mu.callbacks)
old := nl.mu.nodes[new.NodeID]
nl.mu.RUnlock()
// Note that this works fine even if `old` is empty.
should := shouldReplaceLiveness(old, new)
var callbacks []IsLiveCallback
if should {
if !should {
return
}
// Allocate the callbacks slice outside of the mutex.
callbacks := make([]IsLiveCallback, 0, numCallbacks)
nl.mu.Lock()
old = nl.mu.nodes[new.NodeID]
if should = shouldReplaceLiveness(old, new); should {
nl.mu.nodes[new.NodeID] = new
callbacks = append(callbacks, nl.mu.callbacks...)
}
nl.mu.Unlock()

if !should {
return
}

now, offset := nl.clock.Now(), nl.clock.MaxOffset()
if !old.IsLive(now, offset) && new.IsLive(now, offset) {
for _, fn := range callbacks {
Expand Down Expand Up @@ -932,8 +941,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()

nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()

self, err := nl.getLivenessLocked(selfID)
if err == ErrNoLivenessRecord {
Expand Down Expand Up @@ -977,8 +986,8 @@ func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
// GetNodeCount returns a count of the number of nodes in the cluster,
// including dead nodes, but excluding decommissioning or decommissioned nodes.
func (nl *NodeLiveness) GetNodeCount() int {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
var count int
for _, l := range nl.mu.nodes {
if !l.Decommissioning {
Expand Down

0 comments on commit d4e9c9c

Please sign in to comment.