From 28cb2756ca680281d265f75eb3869e2dfaaff71c Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 2 Jan 2023 15:35:24 +1300 Subject: [PATCH] swarm: introduce a MetricsTracer interface --- config/config.go | 11 ++++--- p2p/net/swarm/swarm.go | 10 ++++++- p2p/net/swarm/swarm_conn.go | 5 ++-- p2p/net/swarm/swarm_dial.go | 11 +++++-- p2p/net/swarm/swarm_listen.go | 4 ++- p2p/net/swarm/swarm_metrics.go | 46 ++++++++++++++++------------- p2p/net/swarm/swarm_metrics_test.go | 5 ++-- 7 files changed, 58 insertions(+), 34 deletions(-) diff --git a/config/config.go b/config/config.go index 94e18be27f..874bcb5a5a 100644 --- a/config/config.go +++ b/config/config.go @@ -119,7 +119,7 @@ type Config struct { HolePunchingOptions []holepunch.Option } -func (cfg *Config) makeSwarm() (*swarm.Swarm, error) { +func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) { if cfg.Peerstore == nil { return nil, fmt.Errorf("no peerstore specified") } @@ -151,7 +151,7 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) { return nil, err } - opts := make([]swarm.Option, 0, 3) + opts := make([]swarm.Option, 0, 6) if cfg.Reporter != nil { opts = append(opts, swarm.WithMetrics(cfg.Reporter)) } @@ -167,6 +167,9 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) { if cfg.MultiaddrResolver != nil { opts = append(opts, swarm.WithMultiaddrResolver(cfg.MultiaddrResolver)) } + if enableMetrics { + opts = append(opts, swarm.WithMetricsTracer(swarm.NewMetricsTracer())) + } // TODO: Make the swarm implementation configurable. return swarm.NewSwarm(pid, cfg.Peerstore, opts...) } @@ -276,7 +279,7 @@ func (cfg *Config) addTransports(h host.Host) error { // // This function consumes the config. Do not reuse it (really!). func (cfg *Config) NewNode() (host.Host, error) { - swrm, err := cfg.makeSwarm() + swrm, err := cfg.makeSwarm(true) if err != nil { return nil, err } @@ -382,7 +385,7 @@ func (cfg *Config) NewNode() (host.Host, error) { Peerstore: ps, } - dialer, err := autoNatCfg.makeSwarm() + dialer, err := autoNatCfg.makeSwarm(false) if err != nil { h.Close() return nil, err diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 7606b80c82..18d5183f52 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -71,6 +71,13 @@ func WithMetrics(reporter metrics.Reporter) Option { } } +func WithMetricsTracer(t MetricsTracer) Option { + return func(s *Swarm) error { + s.metricsTracer = t + return nil + } +} + func WithDialTimeout(t time.Duration) Option { return func(s *Swarm) error { s.dialTimeout = t @@ -151,7 +158,8 @@ type Swarm struct { ctx context.Context // is canceled when Close is called ctxCancel context.CancelFunc - bwc metrics.Reporter + bwc metrics.Reporter + metricsTracer MetricsTracer } // NewSwarm constructs a Swarm. diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index f85cd907cb..c24ddee310 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -60,8 +60,9 @@ func (c *Conn) Close() error { } func (c *Conn) doClose() { - recordConnectionClosed(c.stat.Direction, c.ConnState()) - recordConnectionDuration(c.stat.Direction, time.Since(c.stat.Stats.Opened), c.ConnState()) + if c.swarm.metricsTracer != nil { + c.swarm.metricsTracer.ClosedConnection(c.stat.Direction, time.Since(c.stat.Stats.Opened), c.ConnState()) + } c.swarm.removeConn(c) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 09e296ae4b..01b955e865 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -493,12 +493,17 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra start := time.Now() connC, err := tpt.Dial(ctx, addr, p) if err != nil { - recordDialFailed(addr, err) + if s.metricsTracer != nil { + s.metricsTracer.FailedDialing(addr, err) + } return nil, err } canonicallog.LogPeerStatus(100, connC.RemotePeer(), connC.RemoteMultiaddr(), "connection_status", "established", "dir", "outbound") - recordConnectionOpened(network.DirOutbound, connC.RemotePublicKey(), connC.ConnState()) - recordHandshakeLatency(time.Since(start), connC.ConnState()) + if s.metricsTracer != nil { + connState := connC.ConnState() + s.metricsTracer.OpenedConnection(network.DirOutbound, connC.RemotePublicKey(), connState) + s.metricsTracer.CompletedHandshake(time.Since(start), connState) + } // Trust the transport? Yeah... right. if connC.RemotePeer() != p { diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 0c905075ee..334abb4ea3 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -130,7 +130,9 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { return } canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound") - recordConnectionOpened(network.DirInbound, c.RemotePublicKey(), c.ConnState()) + if s.metricsTracer != nil { + s.metricsTracer.OpenedConnection(network.DirInbound, c.RemotePublicKey(), c.ConnState()) + } log.Debugf("swarm listener accepted connection: %s <-> %s", c.LocalMultiaddr(), c.RemoteMultiaddr()) s.refs.Add(1) diff --git a/p2p/net/swarm/swarm_metrics.go b/p2p/net/swarm/swarm_metrics.go index c65622f815..292ad589e3 100644 --- a/p2p/net/swarm/swarm_metrics.go +++ b/p2p/net/swarm/swarm_metrics.go @@ -3,7 +3,6 @@ package swarm import ( "context" "errors" - "fmt" "net" "strings" "sync" @@ -66,10 +65,28 @@ var ( ) ) -func init() { +var initMetricsOnce sync.Once + +func initMetrics() { prometheus.MustRegister(connsOpened, keyTypes, connsClosed, dialError, connDuration, connHandshakeLatency) } +type MetricsTracer interface { + OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState) + ClosedConnection(network.Direction, time.Duration, network.ConnectionState) + CompletedHandshake(time.Duration, network.ConnectionState) + FailedDialing(ma.Multiaddr, error) +} + +type metricsTracer struct{} + +var _ MetricsTracer = &metricsTracer{} + +func NewMetricsTracer() *metricsTracer { + initMetricsOnce.Do(initMetrics) + return &metricsTracer{} +} + var stringPool = sync.Pool{New: func() any { s := make([]string, 0, 8) return &s @@ -110,7 +127,7 @@ func appendConnectionState(tags []string, cs network.ConnectionState) []string { return tags } -func recordConnectionOpened(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) { +func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) { tags := getStringSlice() defer putStringSlice(tags) @@ -124,37 +141,27 @@ func recordConnectionOpened(dir network.Direction, p crypto.PubKey, cs network.C keyTypes.WithLabelValues(*tags...).Inc() } -func recordConnectionClosed(dir network.Direction, cs network.ConnectionState) { +func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Duration, cs network.ConnectionState) { tags := getStringSlice() defer putStringSlice(tags) *tags = append(*tags, getDirection(dir)) *tags = appendConnectionState(*tags, cs) connsClosed.WithLabelValues(*tags...).Inc() -} -func recordConnectionDuration(dir network.Direction, t time.Duration, cs network.ConnectionState) { - tags := getStringSlice() - defer putStringSlice(tags) + *tags = (*tags)[:0] *tags = append(*tags, getDirection(dir)) *tags = appendConnectionState(*tags, cs) - connDuration.WithLabelValues(*tags...).Observe(t.Seconds()) + connDuration.WithLabelValues(*tags...).Observe(duration.Seconds()) } -func recordHandshakeLatency(t time.Duration, cs network.ConnectionState) { +func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.ConnectionState) { tags := getStringSlice() defer putStringSlice(tags) *tags = appendConnectionState(*tags, cs) connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds()) } -func recordDialFailed(addr ma.Multiaddr, err error) { - var transport string - for _, p := range transports { - if _, err := addr.ValueForProtocol(p); err == nil { - transport = ma.ProtocolWithCode(p).Name - break - } - } +func (m *metricsTracer) FailedDialing(_ ma.Multiaddr, err error) { e := "other" if errors.Is(err, context.Canceled) { e = "canceled" @@ -168,8 +175,5 @@ func recordDialFailed(addr ma.Multiaddr, err error) { e = "connection refused" } } - if e == "other" { - fmt.Printf("transport: %s, category: %s (orig: %s)\n", transport, e, err) - } dialError.WithLabelValues(e).Inc() } diff --git a/p2p/net/swarm/swarm_metrics_test.go b/p2p/net/swarm/swarm_metrics_test.go index df1c0504f0..9ccc9d10d2 100644 --- a/p2p/net/swarm/swarm_metrics_test.go +++ b/p2p/net/swarm/swarm_metrics_test.go @@ -20,12 +20,13 @@ func BenchmarkMetricsConnOpen(b *testing.B) { } _, pub, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(b, err) + tr := NewMetricsTracer() for i := 0; i < b.N; i++ { switch i % 2 { case 0: - recordConnectionOpened(network.DirInbound, pub, quicConnState) + tr.OpenedConnection(network.DirInbound, pub, quicConnState) case 1: - recordConnectionOpened(network.DirInbound, pub, tcpConnState) + tr.OpenedConnection(network.DirInbound, pub, tcpConnState) } } }