From 117d66c308fc8d3fd9abf9d4c08442dda60822c7 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 11 Jun 2023 14:09:05 -0700 Subject: [PATCH] Fixed metrics gauge callback deadlock --- common/metrics/counter.go | 2 +- server/shards_director.go | 33 ++++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/common/metrics/counter.go b/common/metrics/counter.go index 06aa0c9a..ec545428 100644 --- a/common/metrics/counter.go +++ b/common/metrics/counter.go @@ -78,7 +78,7 @@ func (c *upDownCounter) Sub(diff int) { c.Add(-diff) } -func NewUpDownCounter(name string, description string, unit Unit, labels map[string]any) Counter { +func NewUpDownCounter(name string, description string, unit Unit, labels map[string]any) UpDownCounter { sc, err := meter.Int64UpDownCounter(name, instrument.WithUnit(string(unit)), instrument.WithDescription(description)) diff --git a/server/shards_director.go b/server/shards_director.go index cab611e6..02ad2075 100644 --- a/server/shards_director.go +++ b/server/shards_director.go @@ -52,6 +52,9 @@ type shardsDirector struct { replicationRpcProvider ReplicationRpcProvider closed bool log zerolog.Logger + + leadersCounter metrics.UpDownCounter + followersCounter metrics.UpDownCounter } func NewShardsDirector(config Config, walFactory wal.WalFactory, kvFactory kv.KVFactory, provider ReplicationRpcProvider) ShardsDirector { @@ -65,18 +68,12 @@ func NewShardsDirector(config Config, walFactory wal.WalFactory, kvFactory kv.KV log: log.With(). Str("component", "shards-director"). Logger(), - } - metrics.NewGauge("oxia_server_leaders_count", "The number of leader controllers in a server", "count", nil, func() int64 { - sd.RLock() - defer sd.RUnlock() - return int64(len(sd.leaders)) - }) - metrics.NewGauge("oxia_server_followers_count", "The number of followers controllers in a server", "count", nil, func() int64 { - sd.RLock() - defer sd.RUnlock() - return int64(len(sd.followers)) - }) + leadersCounter: metrics.NewUpDownCounter("oxia_server_leaders_count", + "The number of leader controllers in a server", "count", map[string]any{}), + followersCounter: metrics.NewUpDownCounter("oxia_server_followers_count", + "The number of follower controllers in a server", "count", map[string]any{}), + } return sd } @@ -138,7 +135,10 @@ func (s *shardsDirector) GetOrCreateLeader(namespace string, shardId int64) (Lea return nil, err } - delete(s.followers, shardId) + if _, ok := s.followers[shardId]; ok { + s.followersCounter.Dec() + delete(s.followers, shardId) + } } // Create new leader controller @@ -146,6 +146,7 @@ func (s *shardsDirector) GetOrCreateLeader(namespace string, shardId int64) (Lea return nil, err } else { s.leaders[shardId] = lc + s.leadersCounter.Inc() return lc, nil } } @@ -169,7 +170,10 @@ func (s *shardsDirector) GetOrCreateFollower(namespace string, shardId int64) (F return nil, err } - delete(s.leaders, shardId) + if _, ok := s.leaders[shardId]; ok { + s.leadersCounter.Dec() + delete(s.leaders, shardId) + } } // Create new follower controller @@ -177,6 +181,7 @@ func (s *shardsDirector) GetOrCreateFollower(namespace string, shardId int64) (F return nil, err } else { s.followers[shardId] = fc + s.followersCounter.Inc() return fc, nil } } @@ -192,6 +197,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele } delete(s.leaders, req.ShardId) + s.leadersCounter.Dec() return resp, nil } @@ -202,6 +208,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele } delete(s.followers, req.ShardId) + s.followersCounter.Dec() return resp, nil } else if fc, err := NewFollowerController(s.config, req.Namespace, req.ShardId, s.walFactory, s.kvFactory); err != nil { return nil, err