Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use RWMutex in NodeLiveness #36316

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions pkg/storage/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type NodeLiveness struct {
metrics LivenessMetrics

mu struct {
syncutil.Mutex
syncutil.RWMutex
callbacks []IsLiveCallback
nodes map[roachpb.NodeID]storagepb.Liveness
heartbeatCallback HeartbeatCallback
Expand Down Expand Up @@ -412,9 +412,9 @@ func (nl *NodeLiveness) StartHeartbeat(
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()

nl.mu.Lock()
nl.mu.RLock()
nl.mu.heartbeatCallback = alive
nl.mu.Unlock()
nl.mu.RUnlock()

stopper.RunWorker(ctx, func(context.Context) {
ambient := nl.ambientCtx
Expand Down Expand Up @@ -595,8 +595,8 @@ func (nl *NodeLiveness) heartbeatInternal(
// liveness record successfully, nor received a gossip message containing
// a former liveness update on restart.
func (nl *NodeLiveness) Self() (*storagepb.Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nl.gossip.NodeID.Get())
}

Expand All @@ -614,9 +614,9 @@ type IsLiveMap map[roachpb.NodeID]IsLiveMapEntry
// each node. This excludes nodes that were removed completely (dead +
// decommissioning).
func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
nl.mu.Lock()
defer nl.mu.Unlock()
lMap := IsLiveMap{}
nl.mu.RLock()
defer nl.mu.RUnlock()
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()
for nID, l := range nl.mu.nodes {
Expand All @@ -637,8 +637,8 @@ func (nl *NodeLiveness) GetIsLiveMap() IsLiveMap {
// every node on the cluster known to gossip. Callers should consider
// calling (statusServer).NodesWithLiveness() instead where possible.
func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
livenesses := make([]storagepb.Liveness, 0, len(nl.mu.nodes))
for _, l := range nl.mu.nodes {
livenesses = append(livenesses, l)
Expand All @@ -650,8 +650,8 @@ func (nl *NodeLiveness) GetLivenesses() []storagepb.Liveness {
// ErrNoLivenessRecord is returned in the event that nothing is yet
// known about nodeID via liveness gossip.
func (nl *NodeLiveness) GetLiveness(nodeID roachpb.NodeID) (*storagepb.Liveness, error) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
return nl.getLivenessLocked(nodeID)
}

Expand Down Expand Up @@ -932,8 +932,8 @@ func (nl *NodeLiveness) numLiveNodes() int64 {
now := nl.clock.Now()
maxOffset := nl.clock.MaxOffset()

nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()

self, err := nl.getLivenessLocked(selfID)
if err == ErrNoLivenessRecord {
Expand Down Expand Up @@ -977,8 +977,8 @@ func (nl *NodeLiveness) AsLiveClock() closedts.LiveClockFn {
// GetNodeCount returns a count of the number of nodes in the cluster,
// including dead nodes, but excluding decommissioning or decommissioned nodes.
func (nl *NodeLiveness) GetNodeCount() int {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.RLock()
defer nl.mu.RUnlock()
var count int
for _, l := range nl.mu.nodes {
if !l.Decommissioning {
Expand Down