Skip to content

Commit

Permalink
Add missing RUnlock's to m3aggregator locking code (#3446)
Browse files Browse the repository at this point in the history
These were missed in https://github.com/m3db/m3/pull/3315/files, and were causing hangs for us in prod.

Suspected sequence leading to hangs:

- RLock()
- Missed unlock on shard not owned `if int(shardID) >= len(agg.shards) return errShardNotOwned`
- Lock() from placement change (will never succeed)
- RLock() -- hangs all subsequent times due to pending Lock()

Once threads start hitting the final RLock, the aggregator locks up and stops processing metrics.

RWMutex behavior is consistent with the docs:

```
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released.
```

https://gist.github.com/andrewmains12/fabc257f4552f8e778828a23346d71c3 has a rough reproduction of the above locking sequence.
  • Loading branch information
andrewmains12 authored Apr 26, 2021
1 parent 70aed2f commit e2c6903
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ func (agg *aggregator) placementTick() {
return
}

agg.RLock()
placement, err := agg.placementManager.Placement()
if err != nil {
m.updateFailures.Inc(1)
continue
}

agg.RLock()
if !agg.shouldProcessPlacementWithLock(placement) {
agg.RUnlock()
continue
Expand Down Expand Up @@ -394,8 +394,10 @@ func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) {
shardID = agg.shardFn(id, uint32(numShards))
}

// Maintain the rlock as long as we're accessing agg.shards (since it can be mutated otherwise).
agg.RLock()
if int(shardID) >= len(agg.shards) {
agg.RUnlock()
return nil, errShardNotOwned
}
shard := agg.shards[shardID]
Expand Down

0 comments on commit e2c6903

Please sign in to comment.