Skip to content

Commit

Permalink
Fixed metrics gauge callback deadlock (#350)
Browse files Browse the repository at this point in the history
When there is a leader election and a Prometheus metrics collection
event is happening there is potential for a deadlock between these 2
go-routines:


#### Shard director gauge callback

This holds the metrics registry mutex and acquires the read-only mutex
on the shards director

```
1 @ 0x4481f6 0x4594fe 0x4594d5 0x475965 0x1a67dfb 0x1a67dd7 0xa56d5e 0xa35aa6 0xa38667 0xa3198e 0xa51a82 0x8a3253 0x479a01
# labels: {"oxia":"metrics"}
#	0x475964	sync.runtime_SemacquireMutex+0x24						/usr/local/go/src/runtime/sema.go:77
#	0x1a67dfa	sync.(*RWMutex).RLock+0x5a							/usr/local/go/src/sync/rwmutex.go:71
#	0x1a67dd6	oxia/server.NewShardsDirector.func1+0x36					/src/oxia/server/shards_director.go:71
#	0xa56d5d	oxia/common/metrics.NewGauge.func1+0x5d						/src/oxia/common/metrics/gauge.go:58
#	0xa35aa5	go.opentelemetry.io/otel/sdk/metric.(*meter).RegisterCallback.func1+0x85	/go/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/meter.go:263
#	0xa38666	go.opentelemetry.io/otel/sdk/metric.(*pipeline).produce+0x326			/go/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/pipeline.go:144
#	0xa3198d	go.opentelemetry.io/otel/sdk/metric.(*manualReader).Collect+0xed		/go/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/manual_reader.go:139
#	0xa51a81	go.opentelemetry.io/otel/exporters/prometheus.(*collector).Collect+0x81		/go/pkg/mod/go.opentelemetry.io/otel/exporters/[email protected]/exporter.go:119
#	0x8a3252	github.com/prometheus/client_golang/prometheus.(*Registry).Gather.func1+0xf2	/go/pkg/mod/github.com/prometheus/[email protected]/prometheus/registry.go:457
```


#### Wal closing

This holds the shards director mutex and tries to acquire the metrics
registry mutex to unregister a gauge callback.

```
1 @ 0x4481f6 0x4594fe 0x4594d5 0x475965 0x4906a5 0xa3820d 0xa381e1 0xa4ef0f 0xa568c2 0x1846545 0x184642b 0x1a513d9 0x1a512d9 0x1a68602 0x1a58ece 0x99d498 0x98bc27 0x99d358 0x974dd0 0x979e8f 0x9726b8 0x479a01
# labels: {"bind":"[::]:6649", "oxia":"internal"}
#	0x475964	sync.runtime_SemacquireMutex+0x24								/usr/local/go/src/runtime/sema.go:77
#	0x4906a4	sync.(*Mutex).lockSlow+0x164									/usr/local/go/src/sync/mutex.go:171
#	0xa3820c	sync.(*Mutex).Lock+0x4c										/usr/local/go/src/sync/mutex.go:90
#	0xa381e0	go.opentelemetry.io/otel/sdk/metric.(*pipeline).addMultiCallback.func1+0x20			/go/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/pipeline.go:116
#	0xa4ef0e	go.opentelemetry.io/otel/sdk/metric.unregisterFuncs.Unregister+0x6e				/go/pkg/mod/go.opentelemetry.io/otel/sdk/[email protected]/pipeline.go:521
#	0xa568c1	oxia/common/metrics.(*gauge).Unregister+0x21							/src/oxia/common/metrics/gauge.go:36
#	0x1846544	oxia/server/wal.(*wal).close+0x44								/src/oxia/server/wal/wal_impl.go:215
#	0x184642a	oxia/server/wal.(*wal).Close+0x6a								/src/oxia/server/wal/wal_impl.go:210
#	0x1a513d8	oxia/server.(*followerController).close+0x38							/src/oxia/server/follower_controller.go:193
#	0x1a512d8	oxia/server.(*followerController).Close+0xd8							/src/oxia/server/follower_controller.go:186
#	0x1a68601	oxia/server.(*shardsDirector).GetOrCreateLeader+0x121						/src/oxia/server/shards_director.go:137
#	0x1a58ecd	oxia/server.(*internalRpcServer).BecomeLeader+0x34d						/src/oxia/server/internal_rpc_server.go:140
#	0x99d497	oxia/proto._OxiaCoordination_BecomeLeader_Handler.func1+0x77					/src/oxia/proto/replication_grpc.pb.go:225
#	0x98bc26	github.com/grpc-ecosystem/go-grpc-prometheus.(*ServerMetrics).UnaryServerInterceptor.func1+0x86	/go/pkg/mod/github.com/grpc-ecosystem/[email protected]/server_metrics.go:107
#	0x99d357	oxia/proto._OxiaCoordination_BecomeLeader_Handler+0x137						/src/oxia/proto/replication_grpc.pb.go:227
#	0x974dcf	google.golang.org/grpc.(*Server).processUnaryRPC+0xdef						/go/pkg/mod/google.golang.org/[email protected]/server.go:1345
#	0x979e8e	google.golang.org/grpc.(*Server).handleStream+0xa2e						/go/pkg/mod/google.golang.org/[email protected]/server.go:1722
#	0x9726b7	google.golang.org/grpc.(*Server).serveStreams.func1.2+0x97					/go/pkg/mod/google.golang.org/[email protected]/server.go:966
```

### Modifications

Converted the shards director gauge into an up/down counter so that we
don't have to acquire any mutex at the metrics collection time.
  • Loading branch information
merlimat authored Jun 12, 2023
1 parent 4205465 commit 95bab49
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
2 changes: 1 addition & 1 deletion common/metrics/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *upDownCounter) Sub(diff int) {
c.Add(-diff)
}

func NewUpDownCounter(name string, description string, unit Unit, labels map[string]any) Counter {
func NewUpDownCounter(name string, description string, unit Unit, labels map[string]any) UpDownCounter {
sc, err := meter.Int64UpDownCounter(name,
instrument.WithUnit(string(unit)),
instrument.WithDescription(description))
Expand Down
33 changes: 20 additions & 13 deletions server/shards_director.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type shardsDirector struct {
replicationRpcProvider ReplicationRpcProvider
closed bool
log zerolog.Logger

leadersCounter metrics.UpDownCounter
followersCounter metrics.UpDownCounter
}

func NewShardsDirector(config Config, walFactory wal.WalFactory, kvFactory kv.KVFactory, provider ReplicationRpcProvider) ShardsDirector {
Expand All @@ -65,18 +68,12 @@ func NewShardsDirector(config Config, walFactory wal.WalFactory, kvFactory kv.KV
log: log.With().
Str("component", "shards-director").
Logger(),
}

metrics.NewGauge("oxia_server_leaders_count", "The number of leader controllers in a server", "count", nil, func() int64 {
sd.RLock()
defer sd.RUnlock()
return int64(len(sd.leaders))
})
metrics.NewGauge("oxia_server_followers_count", "The number of followers controllers in a server", "count", nil, func() int64 {
sd.RLock()
defer sd.RUnlock()
return int64(len(sd.followers))
})
leadersCounter: metrics.NewUpDownCounter("oxia_server_leaders_count",
"The number of leader controllers in a server", "count", map[string]any{}),
followersCounter: metrics.NewUpDownCounter("oxia_server_followers_count",
"The number of follower controllers in a server", "count", map[string]any{}),
}

return sd
}
Expand Down Expand Up @@ -138,14 +135,18 @@ func (s *shardsDirector) GetOrCreateLeader(namespace string, shardId int64) (Lea
return nil, err
}

delete(s.followers, shardId)
if _, ok := s.followers[shardId]; ok {
s.followersCounter.Dec()
delete(s.followers, shardId)
}
}

// Create new leader controller
if lc, err := NewLeaderController(s.config, namespace, shardId, s.replicationRpcProvider, s.walFactory, s.kvFactory); err != nil {
return nil, err
} else {
s.leaders[shardId] = lc
s.leadersCounter.Inc()
return lc, nil
}
}
Expand All @@ -169,14 +170,18 @@ func (s *shardsDirector) GetOrCreateFollower(namespace string, shardId int64) (F
return nil, err
}

delete(s.leaders, shardId)
if _, ok := s.leaders[shardId]; ok {
s.leadersCounter.Dec()
delete(s.leaders, shardId)
}
}

// Create new follower controller
if fc, err := NewFollowerController(s.config, namespace, shardId, s.walFactory, s.kvFactory); err != nil {
return nil, err
} else {
s.followers[shardId] = fc
s.followersCounter.Inc()
return fc, nil
}
}
Expand All @@ -192,6 +197,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele
}

delete(s.leaders, req.ShardId)
s.leadersCounter.Dec()
return resp, nil
}

Expand All @@ -202,6 +208,7 @@ func (s *shardsDirector) DeleteShard(req *proto.DeleteShardRequest) (*proto.Dele
}

delete(s.followers, req.ShardId)
s.followersCounter.Dec()
return resp, nil
} else if fc, err := NewFollowerController(s.config, req.Namespace, req.ShardId, s.walFactory, s.kvFactory); err != nil {
return nil, err
Expand Down

0 comments on commit 95bab49

Please sign in to comment.