Skip to content

Commit

Permalink
Add signal metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
lixmal committed Jun 7, 2024
1 parent 10d8617 commit 32582f0
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 26 deletions.
7 changes: 6 additions & 1 deletion client/cmd/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/netbirdio/netbird/management/server/activity"

"github.com/netbirdio/netbird/util"
Expand Down Expand Up @@ -53,7 +55,10 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}
s := grpc.NewServer()
sigProto.RegisterSignalExchangeServer(s, sig.NewServer())
srv, err := sig.NewServer(nil)
require.NoError(t, err)

sigProto.RegisterSignalExchangeServer(s, srv)
go func() {
if err := s.Serve(lis); err != nil {
panic(err)
Expand Down
10 changes: 7 additions & 3 deletions client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func TestEngine_MultiplePeers(t *testing.T) {
ctx, cancel := context.WithCancel(CtxInitState(context.Background()))
defer cancel()

sigServer, signalAddr, err := startSignal()
sigServer, signalAddr, err := startSignal(t)
if err != nil {
t.Fatal(err)
return
Expand Down Expand Up @@ -1013,15 +1013,19 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
return e, err
}

func startSignal() (*grpc.Server, string, error) {
func startSignal(t *testing.T) (*grpc.Server, string, error) {
t.Helper()

s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

proto.RegisterSignalExchangeServer(s, signalServer.NewServer())
srv, err := signalServer.NewServer(nil)
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)

go func() {
if err = s.Serve(lis); err != nil {
Expand Down
11 changes: 8 additions & 3 deletions client/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/netbirdio/management-integrations/integrations"
"github.com/stretchr/testify/require"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand Down Expand Up @@ -39,7 +40,7 @@ var (
// we will use a management server started via to simulate the server and capture the number of retries
func TestConnectWithRetryRuns(t *testing.T) {
// start the signal server
_, signalAddr, err := startSignal()
_, signalAddr, err := startSignal(t)
if err != nil {
t.Fatalf("failed to start signal server: %v", err)
}
Expand Down Expand Up @@ -141,15 +142,19 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
return s, lis.Addr().String(), nil
}

func startSignal() (*grpc.Server, string, error) {
func startSignal(t *testing.T) (*grpc.Server, string, error) {
t.Helper()

s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

proto.RegisterSignalExchangeServer(s, signalServer.NewServer())
srv, err := signalServer.NewServer(nil)
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)

go func() {
if err = s.Serve(lis); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ require (
github.com/things-go/go-socks5 v0.0.4
github.com/yusufpapurcu/wmi v1.2.4
github.com/zcalusic/sysinfo v1.0.2
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/prometheus v0.48.0
go.opentelemetry.io/otel/metric v1.26.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ github.com/zcalusic/sysinfo v1.0.2 h1:nwTTo2a+WQ0NXwo0BGRojOJvJ/5XKvQih+2RrtWqfx
github.com/zcalusic/sysinfo v1.0.2/go.mod h1:kluzTYflRWo6/tXVMJPdEjShsbPpsFRyy+p1mBQPC30=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
Expand Down
6 changes: 5 additions & 1 deletion signal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ func startSignal() (*grpc.Server, net.Listener) {
panic(err)
}
s := grpc.NewServer()
sigProto.RegisterSignalExchangeServer(s, server.NewServer())
srv, err := server.NewServer(nil)
if err != nil {
panic(err)
}
sigProto.RegisterSignalExchangeServer(s, srv)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
Expand Down
38 changes: 36 additions & 2 deletions signal/cmd/run.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"flag"
"fmt"
Expand All @@ -13,8 +14,11 @@ import (
"strings"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/crypto/acme/autocert"

"github.com/netbirdio/netbird/signal/metrics"

"github.com/netbirdio/netbird/encryption"
"github.com/netbirdio/netbird/signal/proto"
"github.com/netbirdio/netbird/signal/server"
Expand All @@ -28,6 +32,10 @@ import (
"google.golang.org/grpc/keepalive"
)

const (
metricsPort = 9090
)

var (
signalPort int
signalLetsencryptDomain string
Expand Down Expand Up @@ -96,8 +104,26 @@ var (
}

opts = append(opts, signalKaep, signalKasp)

Check failure on line 106 in signal/cmd/run.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

ineffectual assignment to opts (ineffassign)

Check failure on line 106 in signal/cmd/run.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ineffectual assignment to opts (ineffassign)
grpcServer := grpc.NewServer(opts...)
proto.RegisterSignalExchangeServer(grpcServer, server.NewServer())

metricsServer := metrics.NewServer(metricsPort, "")
if err != nil {
return fmt.Errorf("setup metrics: %v", err)
}

grpcServer := grpc.NewServer(grpc.StatsHandler(otelgrpc.NewServerHandler()))

go func() {
log.Infof("running metrics server: %s%s", metricsServer.Addr, metricsServer.Endpoint)
if err := metricsServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("Failed to start metrics server: %v", err)
}
}()

srv, err := server.NewServer(metricsServer.Meter)
if err != nil {
return fmt.Errorf("creating signal server: %v", err)
}
proto.RegisterSignalExchangeServer(grpcServer, srv)

var compatListener net.Listener
if signalPort != 10000 {
Expand Down Expand Up @@ -150,6 +176,14 @@ var (
_ = compatListener.Close()
log.Infof("stopped gRPC backward compatibility server")
}

ctx, cancel := context.WithTimeout(cmd.Context(), 5*time.Second)
defer cancel()
if err := metricsServer.Shutdown(ctx); err != nil {
log.Errorf("Failed to stop metrics server: %v", err)
}
log.Infof("stopped metrics server")

log.Infof("stopped Signal Service")

return nil
Expand Down
62 changes: 62 additions & 0 deletions signal/metrics/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package metrics

import (
"go.opentelemetry.io/otel/metric"
)

// AppMetrics holds all the application metrics
type AppMetrics struct {
metric.Meter

RegisteredPeers metric.Int64UpDownCounter
RegisterTimes metric.Float64Histogram
RegisterCalls metric.Int64Counter
DeregisterCalls metric.Int64Counter
}

func NewAppMetrics(meter metric.Meter) (*AppMetrics, error) {
registeredPeers, err := meter.Int64UpDownCounter("registered_peers_total")
if err != nil {
return nil, err
}

registerTimes, err := meter.Float64Histogram("register_times_milliseconds",
metric.WithExplicitBucketBoundaries(getStandardBucketBoundaries()...))
if err != nil {
return nil, err
}

registerCalls, err := meter.Int64Counter("register_calls_total")
if err != nil {
return nil, err
}

deregisterCalls, err := meter.Int64Counter("deregister_calls_total")
if err != nil {
return nil, err
}

return &AppMetrics{
Meter: meter,
RegisteredPeers: registeredPeers,
RegisterTimes: registerTimes,
RegisterCalls: registerCalls,
DeregisterCalls: deregisterCalls,
}, nil
}

func getStandardBucketBoundaries() []float64 {
return []float64{
0.1,
0.5,
1,
5,
10,
50,
100,
500,
1000,
5000,
10000,
}
}
74 changes: 74 additions & 0 deletions signal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package metrics

import (
"context"
"fmt"
"net/http"
"reflect"

prometheus2 "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
)

const defaultEndpoint = "/metrics"

// Metrics holds the metrics information and exposes it
type Metrics struct {
Meter api.Meter
provider *metric.MeterProvider
Endpoint string

*http.Server
}

// NewServer initializes and returns a new Metrics instance
func NewServer(port int, endpoint string) *Metrics {
exporter, err := prometheus.New()
if err != nil {
return nil
}

provider := metric.NewMeterProvider(metric.WithReader(exporter))
otel.SetMeterProvider(provider)

pkg := reflect.TypeOf(defaultEndpoint).PkgPath()
meter := provider.Meter(pkg)

if endpoint == "" {
endpoint = defaultEndpoint
}

router := http.NewServeMux()
router.Handle(endpoint, promhttp.HandlerFor(
prometheus2.DefaultGatherer,
promhttp.HandlerOpts{EnableOpenMetrics: true}))

server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}

return &Metrics{
Meter: meter,
provider: provider,
Endpoint: endpoint,
Server: server,
}
}

// Shutdown stops the metrics server
func (m *Metrics) Shutdown(ctx context.Context) error {
if err := m.Server.Shutdown(ctx); err != nil {
return fmt.Errorf("http server: %w", err)
}

if err := m.provider.Shutdown(ctx); err != nil {
return fmt.Errorf("meter provider: %w", err)
}

return nil
}
21 changes: 18 additions & 3 deletions signal/peer/peer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package peer

import (
"github.com/netbirdio/netbird/signal/proto"
log "github.com/sirupsen/logrus"
"context"
"sync"
"time"

log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/signal/metrics"
"github.com/netbirdio/netbird/signal/proto"
)

// Peer representation of a connected Peer
Expand Down Expand Up @@ -33,12 +37,14 @@ type Registry struct {
Peers sync.Map
// regMutex ensures that registration and de-registrations are safe
regMutex sync.Mutex
metrics *metrics.AppMetrics
}

// NewRegistry creates a new connected Peer registry
func NewRegistry() *Registry {
func NewRegistry(metrics *metrics.AppMetrics) *Registry {
return &Registry{
regMutex: sync.Mutex{},
metrics: metrics,
}
}

Expand All @@ -60,6 +66,8 @@ func (registry *Registry) IsPeerRegistered(peerId string) bool {

// Register registers peer in the registry
func (registry *Registry) Register(peer *Peer) {
start := time.Now()

registry.regMutex.Lock()
defer registry.regMutex.Unlock()

Expand All @@ -72,6 +80,11 @@ func (registry *Registry) Register(peer *Peer) {
registry.Peers.Store(peer.Id, peer)
}
log.Debugf("peer registered [%s]", peer.Id)

// record time as milliseconds
registry.metrics.RegisterTimes.Record(context.Background(), float64(time.Since(start).Nanoseconds())/1e6)

registry.metrics.RegisterCalls.Add(context.Background(), 1)
}

// Deregister Peer from the Registry (usually once it disconnects)
Expand All @@ -90,4 +103,6 @@ func (registry *Registry) Deregister(peer *Peer) {
}
}
log.Debugf("peer deregistered [%s]", peer.Id)

registry.metrics.DeregisterCalls.Add(context.Background(), 1)
}
Loading

0 comments on commit 32582f0

Please sign in to comment.