Skip to content

Commit

Permalink
circuitv2: relaysvc metrics track client disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Mar 10, 2023
1 parent bfc6fc3 commit 9e66541
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/quic-go/quic-go v0.33.0
github.com/quic-go/webtransport-go v0.5.2
github.com/raulk/go-watchdog v1.3.0
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
77 changes: 65 additions & 12 deletions p2p/protocol/circuitv2/relay/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package relay

import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -113,20 +116,28 @@ type MetricsTracer interface {
// ConnectionClosed tracks metrics on closing a relay connection
ConnectionClosed(d time.Duration)
// ConnectionRequestHandled tracks metrics on handling a relay connection request
ConnectionRequestHandled(status pbv2.Status)
ConnectionRequestHandled(dstPeer *pbv2.Peer, status pbv2.Status)

// ReservationAllowed tracks metrics on opening or renewing a relay reservation
ReservationAllowed(isRenewal bool)
// ReservationRequestClosed tracks metrics on closing a relay reservation
ReservationClosed(cnt int)
ReservationAllowed(p peer.ID, isRenewal bool)
// PeerDisconnected tracks metrics on peer disconnection
PeerDisconnected(p peer.ID, expiry time.Time)
// ReservationExpired tracks metrics on reservation expiry
ReservationExpired(cnt int)
// ReservationRequestHandled tracks metrics on handling a relay reservation request
ReservationRequestHandled(status pbv2.Status)

// BytesTransferred tracks the total bytes transferred by the relay service
BytesTransferred(cnt int)

// GC performs cleanup of the tracers resources
GC()
}

type metricsTracer struct{}
type metricsTracer struct {
mu sync.Mutex
disconnectedPeers map[peer.ID]time.Time
}

var _ MetricsTracer = &metricsTracer{}

Expand All @@ -150,7 +161,7 @@ func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{}
return &metricsTracer{disconnectedPeers: make(map[peer.ID]time.Time)}
}

func (mt *metricsTracer) RelayStatus(enabled bool) {
Expand Down Expand Up @@ -178,7 +189,7 @@ func (mt *metricsTracer) ConnectionClosed(d time.Duration) {
connectionDurationSeconds.Observe(d.Seconds())
}

func (mt *metricsTracer) ConnectionRequestHandled(status pbv2.Status) {
func (mt *metricsTracer) ConnectionRequestHandled(dstId *pbv2.Peer, status pbv2.Status) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

Expand All @@ -188,12 +199,12 @@ func (mt *metricsTracer) ConnectionRequestHandled(status pbv2.Status) {
connectionRequestResponseStatusTotal.WithLabelValues(*tags...).Add(1)
if respStatus == requestStatusRejected {
*tags = (*tags)[:0]
*tags = append(*tags, getRejectionReason(status))
*tags = append(*tags, mt.getConnectionRejectionReason(dstId, status))
connectionRejectionsTotal.WithLabelValues(*tags...).Add(1)
}
}

func (mt *metricsTracer) ReservationAllowed(isRenewal bool) {
func (mt *metricsTracer) ReservationAllowed(p peer.ID, isRenewal bool) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
if isRenewal {
Expand All @@ -203,9 +214,24 @@ func (mt *metricsTracer) ReservationAllowed(isRenewal bool) {
}

reservationsTotal.WithLabelValues(*tags...).Add(1)

mt.mu.Lock()
delete(mt.disconnectedPeers, p)
mt.mu.Unlock()
}

func (mt *metricsTracer) PeerDisconnected(p peer.ID, expiry time.Time) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "closed")

reservationsTotal.WithLabelValues(*tags...).Add(1)
mt.mu.Lock()
defer mt.mu.Unlock()
mt.disconnectedPeers[p] = expiry
}

func (mt *metricsTracer) ReservationClosed(cnt int) {
func (mt *metricsTracer) ReservationExpired(cnt int) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "closed")
Expand All @@ -232,6 +258,35 @@ func (mt *metricsTracer) BytesTransferred(cnt int) {
dataTransferredBytesTotal.Add(float64(cnt))
}

func (mt *metricsTracer) GC() {
mt.mu.Lock()
defer mt.mu.Unlock()
now := time.Now()
for p, expiry := range mt.disconnectedPeers {
if expiry.Before(now) {
delete(mt.disconnectedPeers, p)
}
}
}

func (mt *metricsTracer) getConnectionRejectionReason(dstPeer *pbv2.Peer, status pbv2.Status) string {
if status == pbv2.Status_NO_RESERVATION {
dstPeerInfo, err := util.PeerToPeerInfoV2(dstPeer)
if err != nil {
return "malformed message"
}

mt.mu.Lock()
defer mt.mu.Unlock()
if _, ok := mt.disconnectedPeers[dstPeerInfo.ID]; ok {
return "client disconnected"
}

return "no reservation"
}
return getRejectionReason(status)
}

func getResponseStatus(status pbv2.Status) string {
responseStatus := "unknown"
switch status {
Expand Down Expand Up @@ -259,8 +314,6 @@ func getRejectionReason(status pbv2.Status) string {
reason = "resource limit exceeded"
case pbv2.Status_PERMISSION_DENIED:
reason = "permission denied"
case pbv2.Status_NO_RESERVATION:
reason = "no reservation"
case pbv2.Status_MALFORMED_MESSAGE:
reason = "malformed message"
}
Expand Down
48 changes: 48 additions & 0 deletions p2p/protocol/circuitv2/relay/metrics_alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build nocover

package relay

import (
"math/rand"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
)

func TestNoCoverNoAlloc(t *testing.T) {
statuses := []pbv2.Status{
pbv2.Status_OK,
pbv2.Status_NO_RESERVATION,
pbv2.Status_RESOURCE_LIMIT_EXCEEDED,
pbv2.Status_PERMISSION_DENIED,
}
peerIds := []peer.ID{peer.ID("p1"), peer.ID("p2"), peer.ID("p3")}
peers := []*pbv2.Peer{
util.PeerInfoToPeerV2(peer.AddrInfo{ID: test.RandPeerIDFatal(t)}),
util.PeerInfoToPeerV2(peer.AddrInfo{ID: test.RandPeerIDFatal(t)}),
util.PeerInfoToPeerV2(peer.AddrInfo{ID: test.RandPeerIDFatal(t)}),
}
mt := NewMetricsTracer()
tests := map[string]func(){
"RelayStatus": func() { mt.RelayStatus(rand.Intn(2) == 1) },
"ConnectionOpened": func() { mt.ConnectionOpened() },
"ConnectionClosed": func() { mt.ConnectionClosed(time.Duration(rand.Intn(10)) * time.Second) },
"ConnectionRequestHandled": func() { mt.ConnectionRequestHandled(peers[rand.Intn(len(peers))], statuses[rand.Intn(len(statuses))]) },
"ReservationAllowed": func() { mt.ReservationAllowed(peerIds[rand.Intn(len(peerIds))], rand.Intn(2) == 1) },
"ReservationExpired": func() { mt.ReservationExpired(rand.Intn(10)) },
"ReservationRequestHandled": func() { mt.ReservationRequestHandled(statuses[rand.Intn(len(statuses))]) },
"BytesTransferred": func() { mt.BytesTransferred(rand.Intn(1000)) },
"PeerDisconnected": func() { mt.PeerDisconnected(peerIds[rand.Intn(len(peerIds))], time.Time{}) },
"GC": func() { mt.GC() },
}
for method, f := range tests {
allocs := testing.AllocsPerRun(1000, f)
if allocs > 0 {
t.Fatalf("Alloc Test: %s, got: %0.2f, expected: 0 allocs", method, allocs)
}
}
}
84 changes: 59 additions & 25 deletions p2p/protocol/circuitv2/relay/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,71 @@
//go:build nocover

package relay

import (
"math/rand"
"testing"
"time"

pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func TestNoCoverNoAlloc(t *testing.T) {
statuses := []pbv2.Status{
pbv2.Status_OK,
pbv2.Status_NO_RESERVATION,
pbv2.Status_RESOURCE_LIMIT_EXCEEDED,
pbv2.Status_PERMISSION_DENIED,
func getCounterValue(counter prometheus.Counter) (int, error) {
m := &dto.Metric{}
if err := counter.Write(m); err != nil {
return 0, err
}
return int(*m.Counter.Value), nil
}

func getRandAddrInfo(t *testing.T) peer.AddrInfo {
t.Helper()
pid1 := test.RandPeerIDFatal(t)
addrs1 := []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/udp/1/quic")}
peer1 := peer.AddrInfo{ID: pid1, Addrs: addrs1}
return peer1
}

func TestClientDisconnectionReason(t *testing.T) {
reg := prometheus.NewRegistry()

peer1 := getRandAddrInfo(t)

mt := NewMetricsTracer(WithRegisterer(reg))

// Trigger Disconnected Event
expiry := time.Now().Add(10 * time.Second)
mt.PeerDisconnected(peer1.ID, expiry)

// This should increment "client disconnected"
mt.ConnectionRequestHandled(util.PeerInfoToPeerV2(peer1), pb.Status_NO_RESERVATION)
clientDisconnect, err := getCounterValue(connectionRejectionsTotal.WithLabelValues("client disconnected"))
if err != nil {
t.Errorf("Unexpected error %s", err)
}
noReservation, err := getCounterValue(connectionRejectionsTotal.WithLabelValues("no reservation"))
if err != nil {
t.Errorf("Unexpected err %s", err)
}
if clientDisconnect != 1 || noReservation != 0 {
t.Errorf("Invalid count values: expected clientDisconnect: 1, noReservation: 0, got: clientDisconnect: %d, noReservation: %d ", clientDisconnect, noReservation)
}

peer2 := getRandAddrInfo(t)
// This should increment "no reservation"
mt.ConnectionRequestHandled(util.PeerInfoToPeerV2(peer2), pb.Status_NO_RESERVATION)
clientDisconnect, err = getCounterValue(connectionRejectionsTotal.WithLabelValues("client disconnected"))
if err != nil {
t.Errorf("Unexpected error %s", err)
}
mt := NewMetricsTracer()
tests := map[string]func(){
"RelayStatus": func() { mt.RelayStatus(rand.Intn(2) == 1) },
"ConnectionOpened": func() { mt.ConnectionOpened() },
"ConnectionClosed": func() { mt.ConnectionClosed(time.Duration(rand.Intn(10)) * time.Second) },
"ConnectionRequestHandled": func() { mt.ConnectionRequestHandled(statuses[rand.Intn(len(statuses))]) },
"ReservationAllowed": func() { mt.ReservationAllowed(rand.Intn(2) == 1) },
"ReservationClosed": func() { mt.ReservationClosed(rand.Intn(10)) },
"ReservationRequestHandled": func() { mt.ReservationRequestHandled(statuses[rand.Intn(len(statuses))]) },
"BytesTransferred": func() { mt.BytesTransferred(rand.Intn(1000)) },
noReservation, err = getCounterValue(connectionRejectionsTotal.WithLabelValues("no reservation"))
if err != nil {
t.Errorf("Unexpected err %s", err)
}
for method, f := range tests {
allocs := testing.AllocsPerRun(1000, f)
if allocs > 0 {
t.Fatalf("Alloc Test: %s, got: %0.2f, expected: 0 allocs", method, allocs)
}
if clientDisconnect != 1 || noReservation != 1 {
t.Errorf("Invalid count values: expected clientDisconnect: 1, noReservation: 0, got: clientDisconnect: %d, noReservation: %d ", clientDisconnect, noReservation)
}
}
18 changes: 9 additions & 9 deletions p2p/protocol/circuitv2/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (r *Relay) handleStream(s network.Stream) {
case pbv2.HopMessage_CONNECT:
status := r.handleConnect(s, &msg)
if r.metricsTracer != nil {
r.metricsTracer.ConnectionRequestHandled(status)
r.metricsTracer.ConnectionRequestHandled(msg.GetPeer(), status)
}
default:
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
Expand Down Expand Up @@ -221,7 +221,7 @@ func (r *Relay) handleReserve(s network.Stream) pbv2.Status {
r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight)
r.mx.Unlock()
if r.metricsTracer != nil {
r.metricsTracer.ReservationAllowed(exists)
r.metricsTracer.ReservationAllowed(p, exists)
}

log.Debugf("reserving relay slot for %s", p)
Expand Down Expand Up @@ -645,7 +645,6 @@ func (r *Relay) background() {

func (r *Relay) gc() {
r.mx.Lock()
defer r.mx.Unlock()

now := time.Now()
cnt := 0
Expand All @@ -656,15 +655,16 @@ func (r *Relay) gc() {
cnt++
}
}
if r.metricsTracer != nil {
r.metricsTracer.ReservationClosed(cnt)
}

for p, count := range r.conns {
if count == 0 {
delete(r.conns, p)
}
}
r.mx.Unlock()
if r.metricsTracer != nil {
r.metricsTracer.ReservationExpired(cnt)
r.metricsTracer.GC()
}
}

func (r *Relay) disconnected(n network.Network, c network.Conn) {
Expand All @@ -674,14 +674,14 @@ func (r *Relay) disconnected(n network.Network, c network.Conn) {
}

r.mx.Lock()
_, ok := r.rsvp[p]
expiry, ok := r.rsvp[p]
if ok {
delete(r.rsvp, p)
}
r.mx.Unlock()

if ok && r.metricsTracer != nil {
r.metricsTracer.ReservationClosed(1)
r.metricsTracer.PeerDisconnected(p, expiry)
}
}

Expand Down

0 comments on commit 9e66541

Please sign in to comment.