From e2c6903cc0a027a129674afbf819ef294feceb07 Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Mon, 26 Apr 2021 10:31:25 -0600 Subject: [PATCH] Add missing RUnlock's to m3aggregator locking code (#3446) These were missed in https://github.com/m3db/m3/pull/3315/files, and were causing hangs for us in prod. Suspected sequence leading to hangs: - RLock() - Missed unlock on shard not owned `if int(shardID) >= len(agg.shards) return errShardNotOwned` - Lock() from placement change (will never succeed) - RLock() -- hangs all subsequent times due to pending Lock() Once threads start hitting the final RLock, the aggregator locks up and stops processing metrics. RWMutex behavior is consistent with the docs: ``` // If a goroutine holds a RWMutex for reading and another goroutine might // call Lock, no goroutine should expect to be able to acquire a read lock // until the initial read lock is released. ``` https://gist.github.com/andrewmains12/fabc257f4552f8e778828a23346d71c3 has a rough reproduction of the above locking sequence. --- src/aggregator/aggregator/aggregator.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 1f14115fdf..4faf304255 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -198,13 +198,13 @@ func (agg *aggregator) placementTick() { return } - agg.RLock() placement, err := agg.placementManager.Placement() if err != nil { m.updateFailures.Inc(1) continue } + agg.RLock() if !agg.shouldProcessPlacementWithLock(placement) { agg.RUnlock() continue @@ -394,8 +394,10 @@ func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) { shardID = agg.shardFn(id, uint32(numShards)) } + // Maintain the rlock as long as we're accessing agg.shards (since it can be mutated otherwise). agg.RLock() if int(shardID) >= len(agg.shards) { + agg.RUnlock() return nil, errShardNotOwned } shard := agg.shards[shardID]