From 95bab49ecad188fa84e16a4cf4d3ec7127d392b5 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 12 Jun 2023 08:39:19 -0700 Subject: [PATCH] Fixed metrics gauge callback deadlock (#350) When there is a leader election and a Prometheus metrics collection event is happening there is potential for a deadlock between these 2 go-routines: #### Shard director gauge callback This holds the metrics registry mutex and acquires the read-only mutex on the shards director ``` 1 @ 0x4481f6 0x4594fe 0x4594d5 0x475965 0x1a67dfb 0x1a67dd7 0xa56d5e 0xa35aa6 0xa38667 0xa3198e 0xa51a82 0x8a3253 0x479a01 # labels: {"oxia":"metrics"} # 0x475964 sync.runtime_SemacquireMutex+0x24 /usr/local/go/src/runtime/sema.go:77 # 0x1a67dfa sync.(*RWMutex).RLock+0x5a /usr/local/go/src/sync/rwmutex.go:71 # 0x1a67dd6 oxia/server.NewShardsDirector.func1+0x36 /src/oxia/server/shards_director.go:71 # 0xa56d5d oxia/common/metrics.NewGauge.func1+0x5d /src/oxia/common/metrics/gauge.go:58 # 0xa35aa5 go.opentelemetry.io/otel/sdk/metric.(*meter).RegisterCallback.func1+0x85 /go/pkg/mod/go.opentelemetry.io/otel/sdk/metric@v0.38.0-rc.2.0.20230420231439-002444a2e743/meter.go:263 # 0xa38666 go.opentelemetry.io/otel/sdk/metric.(*pipeline).produce+0x326 /go/pkg/mod/go.opentelemetry.io/otel/sdk/metric@v0.38.0-rc.2.0.20230420231439-002444a2e743/pipeline.go:144 # 0xa3198d go.opentelemetry.io/otel/sdk/metric.(*manualReader).Collect+0xed /go/pkg/mod/go.opentelemetry.io/otel/sdk/metric@v0.38.0-rc.2.0.20230420231439-002444a2e743/manual_reader.go:139 # 0xa51a81 go.opentelemetry.io/otel/exporters/prometheus.(*collector).Collect+0x81 /go/pkg/mod/go.opentelemetry.io/otel/exporters/prometheus@v0.38.0-rc.2.0.20230420231439-002444a2e743/exporter.go:119 # 0x8a3252 github.com/prometheus/client_golang/prometheus.(*Registry).Gather.func1+0xf2 /go/pkg/mod/github.com/prometheus/client_golang@v1.15.0/prometheus/registry.go:457 ``` #### Wal closing This holds the shards director mutex and tries to acquire the metrics registry mutex to unregister a gauge callback. ``` 1 @ 0x4481f6 0x4594fe 0x4594d5 0x475965 0x4906a5 0xa3820d 0xa381e1 0xa4ef0f 0xa568c2 0x1846545 0x184642b 0x1a513d9 0x1a512d9 0x1a68602 0x1a58ece 0x99d498 0x98bc27 0x99d358 0x974dd0 0x979e8f 0x9726b8 0x479a01 # labels: {"bind":"[::]:6649", "oxia":"internal"} # 0x475964 sync.runtime_SemacquireMutex+0x24 /usr/local/go/src/runtime/sema.go:77 # 0x4906a4 sync.(*Mutex).lockSlow+0x164 /usr/local/go/src/sync/mutex.go:171 # 0xa3820c sync.(*Mutex).Lock+0x4c /usr/local/go/src/sync/mutex.go:90 # 0xa381e0 go.opentelemetry.io/otel/sdk/metric.(*pipeline).addMultiCallback.func1+0x20 /go/pkg/mod/go.opentelemetry.io/otel/sdk/metric@v0.38.0-rc.2.0.20230420231439-002444a2e743/pipeline.go:116 # 0xa4ef0e go.opentelemetry.io/otel/sdk/metric.unregisterFuncs.Unregister+0x6e /go/pkg/mod/go.opentelemetry.io/otel/sdk/metric@v0.38.0-rc.2.0.20230420231439-002444a2e743/pipeline.go:521 # 0xa568c1 oxia/common/metrics.(*gauge).Unregister+0x21 /src/oxia/common/metrics/gauge.go:36 # 0x1846544 oxia/server/wal.(*wal).close+0x44 /src/oxia/server/wal/wal_impl.go:215 # 0x184642a oxia/server/wal.(*wal).Close+0x6a /src/oxia/server/wal/wal_impl.go:210 # 0x1a513d8 oxia/server.(*followerController).close+0x38 /src/oxia/server/follower_controller.go:193 # 0x1a512d8 oxia/server.(*followerController).Close+0xd8 /src/oxia/server/follower_controller.go:186 # 0x1a68601 oxia/server.(*shardsDirector).GetOrCreateLeader+0x121 /src/oxia/server/shards_director.go:137 # 0x1a58ecd oxia/server.(*internalRpcServer).BecomeLeader+0x34d /src/oxia/server/internal_rpc_server.go:140 # 0x99d497 oxia/proto._OxiaCoordination_BecomeLeader_Handler.func1+0x77 /src/oxia/proto/replication_grpc.pb.go:225 # 0x98bc26 github.com/grpc-ecosystem/go-grpc-prometheus.(*ServerMetrics).UnaryServerInterceptor.func1+0x86 /go/pkg/mod/github.com/grpc-ecosystem/go-grpc-prometheus@v1.2.0/server_metrics.go:107 # 0x99d357 oxia/proto._OxiaCoordination_BecomeLeader_Handler+0x137 /src/oxia/proto/replication_grpc.pb.go:227 # 0x974dcf google.golang.org/grpc.(*Server).processUnaryRPC+0xdef /go/pkg/mod/google.golang.org/grpc@v1.54.0/server.go:1345 # 0x979e8e google.golang.org/grpc.(*Server).handleStream+0xa2e /go/pkg/mod/google.golang.org/grpc@v1.54.0/server.go:1722 # 0x9726b7 google.golang.org/grpc.(*Server).serveStreams.func1.2+0x97 /go/pkg/mod/google.golang.org/grpc@v1.54.0/server.go:966 ``` ### Modifications Converted the shards director gauge into an up/down counter so that we don't have to acquire any mutex at the metrics collection time. --- common/metrics/counter.go | 2 +- server/shards_director.go | 33 ++++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/common/metrics/counter.go b/common/metrics/counter.go index 06aa0c9a..ec545428 100644 --- a/common/metrics/counter.go +++ b/common/metrics/counter.go @@ -78,7 +78,7 @@ func (c *upDownCounter) Sub(diff int) { c.Add(-diff) } -func NewUpDownCounter(name string, description string, unit Unit, labels map[string]any) Counter { +func NewUpDownCounter(name string, description string, unit Unit, labels map[string]any) UpDownCounter { sc, err := meter.Int64UpDownCounter(name, instrument.WithUnit(string(unit)), instrument.WithDescription(description)) diff --git a/server/shards_director.go b/server/shards_director.go index cab611e6..02ad2075 100644 --- a/server/shards_director.go +++ b/server/shards_director.go @@ -52,6 +52,9 @@ type shardsDirector struct { replicationRpcProvider ReplicationRpcProvider closed bool log zerolog.Logger + + leadersCounter metrics.UpDownCounter + followersCounter metrics.UpDownCounter } func NewShardsDirector(config Config, walFactory wal.WalFactory, kvFactory kv.KVFactory, provider ReplicationRpcProvider) ShardsDirector { @@ -65,18 +68,12 @@ func NewShardsDirector(config Config, walFactory wal.WalFactory, kvFactory kv.KV log: log.With(). Str("component", "shards-director"). Logger(), - } - metrics.NewGauge("oxia_server_leaders_count", "The number of leader controllers in a server", "count", nil, func() int64 { - sd.RLock() - defer sd.RUnlock() - return int64(len(sd.leaders)) - }) - metrics.NewGauge("oxia_server_followers_count", "The number of followers controllers in a server", "count", nil, func() int64 { - sd.RLock() - defer sd.RUnlock() - return int64(len(sd.followers)) - }) + leadersCounter: metrics.NewUpDownCounter("oxia_server_leaders_count", + "The number of leader controllers in a server", "count", map[string]any{}), + followersCounter: metrics.NewUpDownCounter("oxia_server_followers_count", + "The number of follower controllers in a server", "count", map[string]any{}), + } return sd } @@ -138,7 +135,10 @@ func (s *shardsDirector) GetOrCreateLeader(namespace string, shardId int64) (Lea return nil, err } - delete(s.followers, shardId) + if _, ok := s.followers[shardId]; ok { + s.followersCounter.Dec() + delete(s.followers, shardId) + } } // Create new leader controller @@ -146,6 +146,7 @@ func (s *shardsDirector) GetOrCreateLeader(namespace string, shardId int64) (Lea return nil, err } else { s.leaders[shardId] = lc + s.leadersCounter.Inc() return lc, nil } } @@ -169,7 +170,10 @@ func (s *shardsDirector) GetOrCreateFollower(namespace string, shardId int64) (F return nil, err } - delete(s.leaders, shardId) + if _, ok := s.leaders[shardId]; ok { + s.leadersCounter.Dec() + delete(s.leaders, shardId) + } } // Create new follower controller @@ -177,6 +181,7 @@ func (s *shardsDirector) GetOrCreateFollower(namespace string, shardId int64) (F return nil, err } else { s.followers[shardId] = fc + s.followersCounter.Inc() return fc, nil } } @@ -192,6 +197,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele } delete(s.leaders, req.ShardId) + s.leadersCounter.Dec() return resp, nil } @@ -202,6 +208,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele } delete(s.followers, req.ShardId) + s.followersCounter.Dec() return resp, nil } else if fc, err := NewFollowerController(s.config, req.Namespace, req.ShardId, s.walFactory, s.kvFactory); err != nil { return nil, err