Skip to content

Commit

Permalink
storage: use RWMutex in NodeLiveness
Browse files Browse the repository at this point in the history
When at Mutex profiles for heavily loaded TPC-C clusters we noticed that a lot
of time was being spent blocked on a RWMutex held by Replica.leaseGoodToGo which
underneath was reading NodeLiveness state in a read-only way. This PR adds a
RWMutex to NodeLiveness to eliminate contention. Prior to this change we
observed nearly 60% of lock contention on leaseGoodToGo. After we observe
closer to 20%.

Release note: None
  • Loading branch information
ajwerner committed Apr 2, 2019
1 parent a17d619 commit a134caa
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions pkg/storage/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type NodeLiveness struct {
metrics LivenessMetrics

mu struct {
syncutil.Mutex
syncutil.RWMutex
callbacks []IsLiveCallback
nodes map[roachpb.NodeID]storagepb.Liveness
heartbeatCallback HeartbeatCallback
Expand Down Expand Up @@ -412,9 +412,9 @@ func (nl *NodeLiveness) StartHeartbeat(
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()

nl.mu.Lock()
nl.mu.RLock()
nl.mu.heartbeatCallback = alive
nl.mu.Unlock()
nl.mu.RUnlock()

stopper.RunWorker(ctx, func(context.Context) {
ambient := nl.ambientCtx
Expand Down Expand Up @@ -595,8 +595,8 @@ func (nl *NodeLiveness) heartbeatInternal(
// liveness record successfully, nor received a gossip message containing
// a former liveness update on restart.
func (nl *NodeLiveness) Self() (*storagepb.Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nl.gossip.NodeID.Get())
}

Expand All @@ -614,9 +614,9 @@ type IsLiveMap map[roachpb.NodeID]IsLiveMapEntry
// each node. This excludes nodes that were removed completely (dead +
// decommissioning).
func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
nl.mu.Lock()
defer nl.mu.Unlock()
lMap := IsLiveMap{}
nl.mu.RLock()
defer nl.mu.RUnlock()
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()
for nID, l := range nl.mu.nodes {
Expand All @@ -637,8 +637,8 @@ func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
// every node on the cluster known to gossip. Callers should consider
// calling (statusServer).NodesWithLiveness() instead where possible.
func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
livenesses := make([]storagepb.Liveness, 0, len(nl.mu.nodes))
for _, l := range nl.mu.nodes {
livenesses = append(livenesses, l)
Expand All @@ -650,8 +650,8 @@ func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness {
// ErrNoLivenessRecord is returned in the event that nothing is yet
// known about nodeID via liveness gossip.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (*storagepb.Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nodeID)
}

Expand Down Expand Up @@ -932,8 +932,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()

nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()

self, err := nl.getLivenessLocked(selfID)
if err == ErrNoLivenessRecord {
Expand Down Expand Up @@ -977,8 +977,8 @@ func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
// GetNodeCount returns a count of the number of nodes in the cluster,
// including dead nodes, but excluding decommissioning or decommissioned nodes.
func (nl *NodeLiveness) GetNodeCount() int {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
var count int
for _, l := range nl.mu.nodes {
if !l.Decommissioning {
Expand Down

0 comments on commit a134caa

Please sign in to comment.