Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

circuitv2: relaysvc metrics track client disconnects #2180

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/quic-go/quic-go v0.33.0
github.com/quic-go/webtransport-go v0.5.2
github.com/raulk/go-watchdog v1.3.0
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
77 changes: 65 additions & 12 deletions p2p/protocol/circuitv2/relay/metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package relay

import (
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/metricshelper"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -113,20 +116,28 @@ type MetricsTracer interface {
// ConnectionClosed tracks metrics on closing a relay connection
ConnectionClosed(d time.Duration)
// ConnectionRequestHandled tracks metrics on handling a relay connection request
ConnectionRequestHandled(status pbv2.Status)
ConnectionRequestHandled(dstPeer *pbv2.Peer, status pbv2.Status)

// ReservationAllowed tracks metrics on opening or renewing a relay reservation
ReservationAllowed(isRenewal bool)
// ReservationRequestClosed tracks metrics on closing a relay reservation
ReservationClosed(cnt int)
ReservationAllowed(p peer.ID, isRenewal bool)
// PeerDisconnected tracks metrics on peer disconnection
PeerDisconnected(p peer.ID, expiry time.Time)
// ReservationExpired tracks metrics on reservation expiry
ReservationExpired(cnt int)
// ReservationRequestHandled tracks metrics on handling a relay reservation request
ReservationRequestHandled(status pbv2.Status)

// BytesTransferred tracks the total bytes transferred by the relay service
BytesTransferred(cnt int)

// GC performs cleanup of the tracers resources
GC()
}

type metricsTracer struct{}
type metricsTracer struct {
mu sync.Mutex
disconnectedPeers map[peer.ID]time.Time
}

var _ MetricsTracer = &metricsTracer{}

Expand All @@ -150,7 +161,7 @@ func NewMetricsTracer(opts ...MetricsTracerOption) MetricsTracer {
opt(setting)
}
metricshelper.RegisterCollectors(setting.reg, collectors...)
return &metricsTracer{}
return &metricsTracer{disconnectedPeers: make(map[peer.ID]time.Time)}
}

func (mt *metricsTracer) RelayStatus(enabled bool) {
Expand Down Expand Up @@ -178,7 +189,7 @@ func (mt *metricsTracer) ConnectionClosed(d time.Duration) {
connectionDurationSeconds.Observe(d.Seconds())
}

func (mt *metricsTracer) ConnectionRequestHandled(status pbv2.Status) {
func (mt *metricsTracer) ConnectionRequestHandled(dstId *pbv2.Peer, status pbv2.Status) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

Expand All @@ -188,12 +199,12 @@ func (mt *metricsTracer) ConnectionRequestHandled(status pbv2.Status) {
connectionRequestResponseStatusTotal.WithLabelValues(*tags...).Add(1)
if respStatus == requestStatusRejected {
*tags = (*tags)[:0]
*tags = append(*tags, getRejectionReason(status))
*tags = append(*tags, mt.getConnectionRejectionReason(dstId, status))
connectionRejectionsTotal.WithLabelValues(*tags...).Add(1)
}
}

func (mt *metricsTracer) ReservationAllowed(isRenewal bool) {
func (mt *metricsTracer) ReservationAllowed(p peer.ID, isRenewal bool) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
if isRenewal {
Expand All @@ -203,9 +214,24 @@ func (mt *metricsTracer) ReservationAllowed(isRenewal bool) {
}

reservationsTotal.WithLabelValues(*tags...).Add(1)

mt.mu.Lock()
delete(mt.disconnectedPeers, p)
mt.mu.Unlock()
}

func (mt *metricsTracer) PeerDisconnected(p peer.ID, expiry time.Time) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "closed")

reservationsTotal.WithLabelValues(*tags...).Add(1)
mt.mu.Lock()
defer mt.mu.Unlock()
mt.disconnectedPeers[p] = expiry
}

func (mt *metricsTracer) ReservationClosed(cnt int) {
func (mt *metricsTracer) ReservationExpired(cnt int) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
*tags = append(*tags, "closed")
Expand All @@ -232,6 +258,35 @@ func (mt *metricsTracer) BytesTransferred(cnt int) {
dataTransferredBytesTotal.Add(float64(cnt))
}

func (mt *metricsTracer) GC() {
mt.mu.Lock()
defer mt.mu.Unlock()
now := time.Now()
for p, expiry := range mt.disconnectedPeers {
if expiry.Before(now) {
delete(mt.disconnectedPeers, p)
}
}
}

func (mt *metricsTracer) getConnectionRejectionReason(dstPeer *pbv2.Peer, status pbv2.Status) string {
if status == pbv2.Status_NO_RESERVATION {
dstPeerInfo, err := util.PeerToPeerInfoV2(dstPeer)
if err != nil {
return "malformed message"
}

mt.mu.Lock()
defer mt.mu.Unlock()
if _, ok := mt.disconnectedPeers[dstPeerInfo.ID]; ok {
return "client disconnected"
}

return "no reservation"
}
return getRejectionReason(status)
}

func getResponseStatus(status pbv2.Status) string {
responseStatus := "unknown"
switch status {
Expand Down Expand Up @@ -259,8 +314,6 @@ func getRejectionReason(status pbv2.Status) string {
reason = "resource limit exceeded"
case pbv2.Status_PERMISSION_DENIED:
reason = "permission denied"
case pbv2.Status_NO_RESERVATION:
reason = "no reservation"
case pbv2.Status_MALFORMED_MESSAGE:
reason = "malformed message"
}
Expand Down
48 changes: 48 additions & 0 deletions p2p/protocol/circuitv2/relay/metrics_alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build nocover

package relay

import (
"math/rand"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
)

func TestNoCoverNoAlloc(t *testing.T) {
statuses := []pbv2.Status{
pbv2.Status_OK,
pbv2.Status_NO_RESERVATION,
pbv2.Status_RESOURCE_LIMIT_EXCEEDED,
pbv2.Status_PERMISSION_DENIED,
}
peerIds := []peer.ID{peer.ID("p1"), peer.ID("p2"), peer.ID("p3")}
peers := []*pbv2.Peer{
util.PeerInfoToPeerV2(peer.AddrInfo{ID: test.RandPeerIDFatal(t)}),
util.PeerInfoToPeerV2(peer.AddrInfo{ID: test.RandPeerIDFatal(t)}),
util.PeerInfoToPeerV2(peer.AddrInfo{ID: test.RandPeerIDFatal(t)}),
}
mt := NewMetricsTracer()
tests := map[string]func(){
"RelayStatus": func() { mt.RelayStatus(rand.Intn(2) == 1) },
"ConnectionOpened": func() { mt.ConnectionOpened() },
"ConnectionClosed": func() { mt.ConnectionClosed(time.Duration(rand.Intn(10)) * time.Second) },
"ConnectionRequestHandled": func() { mt.ConnectionRequestHandled(peers[rand.Intn(len(peers))], statuses[rand.Intn(len(statuses))]) },
"ReservationAllowed": func() { mt.ReservationAllowed(peerIds[rand.Intn(len(peerIds))], rand.Intn(2) == 1) },
"ReservationExpired": func() { mt.ReservationExpired(rand.Intn(10)) },
"ReservationRequestHandled": func() { mt.ReservationRequestHandled(statuses[rand.Intn(len(statuses))]) },
"BytesTransferred": func() { mt.BytesTransferred(rand.Intn(1000)) },
"PeerDisconnected": func() { mt.PeerDisconnected(peerIds[rand.Intn(len(peerIds))], time.Time{}) },
"GC": func() { mt.GC() },
}
for method, f := range tests {
allocs := testing.AllocsPerRun(1000, f)
if allocs > 0 {
t.Fatalf("Alloc Test: %s, got: %0.2f, expected: 0 allocs", method, allocs)
}
}
}
84 changes: 59 additions & 25 deletions p2p/protocol/circuitv2/relay/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,71 @@
//go:build nocover

package relay

import (
"math/rand"
"testing"
"time"

pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/util"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func TestNoCoverNoAlloc(t *testing.T) {
statuses := []pbv2.Status{
pbv2.Status_OK,
pbv2.Status_NO_RESERVATION,
pbv2.Status_RESOURCE_LIMIT_EXCEEDED,
pbv2.Status_PERMISSION_DENIED,
func getCounterValue(counter prometheus.Counter) (int, error) {
m := &dto.Metric{}
if err := counter.Write(m); err != nil {
return 0, err
}
return int(*m.Counter.Value), nil
}

func getRandAddrInfo(t *testing.T) peer.AddrInfo {
t.Helper()
pid1 := test.RandPeerIDFatal(t)
addrs1 := []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/udp/1/quic")}
peer1 := peer.AddrInfo{ID: pid1, Addrs: addrs1}
return peer1
}

func TestClientDisconnectionReason(t *testing.T) {
reg := prometheus.NewRegistry()

peer1 := getRandAddrInfo(t)

mt := NewMetricsTracer(WithRegisterer(reg))

// Trigger Disconnected Event
expiry := time.Now().Add(10 * time.Second)
mt.PeerDisconnected(peer1.ID, expiry)

// This should increment "client disconnected"
mt.ConnectionRequestHandled(util.PeerInfoToPeerV2(peer1), pb.Status_NO_RESERVATION)
clientDisconnect, err := getCounterValue(connectionRejectionsTotal.WithLabelValues("client disconnected"))
if err != nil {
t.Errorf("Unexpected error %s", err)
}
noReservation, err := getCounterValue(connectionRejectionsTotal.WithLabelValues("no reservation"))
if err != nil {
t.Errorf("Unexpected err %s", err)
}
if clientDisconnect != 1 || noReservation != 0 {
t.Errorf("Invalid count values: expected clientDisconnect: 1, noReservation: 0, got: clientDisconnect: %d, noReservation: %d ", clientDisconnect, noReservation)
}

peer2 := getRandAddrInfo(t)
// This should increment "no reservation"
mt.ConnectionRequestHandled(util.PeerInfoToPeerV2(peer2), pb.Status_NO_RESERVATION)
clientDisconnect, err = getCounterValue(connectionRejectionsTotal.WithLabelValues("client disconnected"))
if err != nil {
t.Errorf("Unexpected error %s", err)
}
mt := NewMetricsTracer()
tests := map[string]func(){
"RelayStatus": func() { mt.RelayStatus(rand.Intn(2) == 1) },
"ConnectionOpened": func() { mt.ConnectionOpened() },
"ConnectionClosed": func() { mt.ConnectionClosed(time.Duration(rand.Intn(10)) * time.Second) },
"ConnectionRequestHandled": func() { mt.ConnectionRequestHandled(statuses[rand.Intn(len(statuses))]) },
"ReservationAllowed": func() { mt.ReservationAllowed(rand.Intn(2) == 1) },
"ReservationClosed": func() { mt.ReservationClosed(rand.Intn(10)) },
"ReservationRequestHandled": func() { mt.ReservationRequestHandled(statuses[rand.Intn(len(statuses))]) },
"BytesTransferred": func() { mt.BytesTransferred(rand.Intn(1000)) },
noReservation, err = getCounterValue(connectionRejectionsTotal.WithLabelValues("no reservation"))
if err != nil {
t.Errorf("Unexpected err %s", err)
}
for method, f := range tests {
allocs := testing.AllocsPerRun(1000, f)
if allocs > 0 {
t.Fatalf("Alloc Test: %s, got: %0.2f, expected: 0 allocs", method, allocs)
}
if clientDisconnect != 1 || noReservation != 1 {
t.Errorf("Invalid count values: expected clientDisconnect: 1, noReservation: 0, got: clientDisconnect: %d, noReservation: %d ", clientDisconnect, noReservation)
}
}
18 changes: 9 additions & 9 deletions p2p/protocol/circuitv2/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (r *Relay) handleStream(s network.Stream) {
case pbv2.HopMessage_CONNECT:
status := r.handleConnect(s, &msg)
if r.metricsTracer != nil {
r.metricsTracer.ConnectionRequestHandled(status)
r.metricsTracer.ConnectionRequestHandled(msg.GetPeer(), status)
}
default:
r.handleError(s, pbv2.Status_MALFORMED_MESSAGE)
Expand Down Expand Up @@ -221,7 +221,7 @@ func (r *Relay) handleReserve(s network.Stream) pbv2.Status {
r.host.ConnManager().TagPeer(p, "relay-reservation", ReservationTagWeight)
r.mx.Unlock()
if r.metricsTracer != nil {
r.metricsTracer.ReservationAllowed(exists)
r.metricsTracer.ReservationAllowed(p, exists)
}

log.Debugf("reserving relay slot for %s", p)
Expand Down Expand Up @@ -645,7 +645,6 @@ func (r *Relay) background() {

func (r *Relay) gc() {
r.mx.Lock()
defer r.mx.Unlock()

now := time.Now()
cnt := 0
Expand All @@ -656,15 +655,16 @@ func (r *Relay) gc() {
cnt++
}
}
if r.metricsTracer != nil {
r.metricsTracer.ReservationClosed(cnt)
}

for p, count := range r.conns {
if count == 0 {
delete(r.conns, p)
}
}
r.mx.Unlock()
if r.metricsTracer != nil {
r.metricsTracer.ReservationExpired(cnt)
r.metricsTracer.GC()
}
}

func (r *Relay) disconnected(n network.Network, c network.Conn) {
Expand All @@ -674,14 +674,14 @@ func (r *Relay) disconnected(n network.Network, c network.Conn) {
}

r.mx.Lock()
_, ok := r.rsvp[p]
expiry, ok := r.rsvp[p]
if ok {
delete(r.rsvp, p)
}
r.mx.Unlock()

if ok && r.metricsTracer != nil {
r.metricsTracer.ReservationClosed(1)
r.metricsTracer.PeerDisconnected(p, expiry)
}
}

Expand Down