Skip to content

Commit

Permalink
gossip metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim committed Sep 11, 2023
1 parent 95b5453 commit 26032cf
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 21 deletions.
42 changes: 36 additions & 6 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"google.golang.org/protobuf/proto"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
Expand All @@ -34,20 +36,42 @@ func NewGossiper[T any, U GossipableAny[T]](
log logging.Logger,
set Set[U],
client *p2p.Client,
) *Gossiper[T, U] {
return &Gossiper[T, U]{
metrics prometheus.Registerer,
namespace string,
) (*Gossiper[T, U], error) {
g := &Gossiper[T, U]{
config: config,
log: log,
set: set,
client: client,
receivedN: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_received_n",
Help: "amount of gossip received (n)",
}),
receivedBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}),
}

errs := wrappers.Errs{}
errs.Add(
metrics.Register(g.receivedN),
metrics.Register(g.receivedBytes),
)

return g, errs.Err
}

type Gossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
client *p2p.Client
config Config
log logging.Logger
set Set[U]
client *p2p.Client
receivedN prometheus.Counter
receivedBytes prometheus.Counter
}

func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
Expand Down Expand Up @@ -112,7 +136,10 @@ func (g *Gossiper[T, U]) handleResponse(
return
}

receivedBytes := 0
for _, bytes := range response.Gossip {
receivedBytes += len(bytes)

gossipable := U(new(T))
if err := gossipable.Unmarshal(bytes); err != nil {
g.log.Debug(
Expand All @@ -139,4 +166,7 @@ func (g *Gossiper[T, U]) handleResponse(
continue
}
}

g.receivedN.Add(float64(len(response.Gossip)))
g.receivedBytes.Add(float64(receivedBytes))
}
22 changes: 18 additions & 4 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"
Expand All @@ -20,9 +21,20 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
)

func TestGossiperShutdown(_ *testing.T) {
func TestGossiperShutdown(t *testing.T) {
require := require.New(t)

config := Config{Frequency: time.Second}
gossiper := NewGossiper[testTx](config, logging.NoLog{}, nil, nil)
metrics := prometheus.NewRegistry()
gossiper, err := NewGossiper[testTx](
config,
logging.NoLog{},
nil,
nil,
metrics,
"",
)
require.NoError(err)
ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -107,7 +119,8 @@ func TestGossiperGossip(t *testing.T) {
peers := &p2p.Peers{}
require.NoError(peers.Connected(context.Background(), ids.EmptyNodeID, nil))

handler := NewHandler[*testTx](responseSet, tt.maxResponseSize)
handler, err := NewHandler[*testTx](responseSet, tt.maxResponseSize, prometheus.NewRegistry(), "")
require.NoError(err)
_, err = responseRouter.RegisterAppProtocol(0x0, handler, peers)
require.NoError(err)

Expand Down Expand Up @@ -146,7 +159,8 @@ func TestGossiperGossip(t *testing.T) {
Frequency: 500 * time.Millisecond,
PollSize: 1,
}
gossiper := NewGossiper[testTx, *testTx](config, logging.NoLog{}, requestSet, requestClient)
gossiper, err := NewGossiper[testTx, *testTx](config, logging.NoLog{}, requestSet, requestClient, prometheus.NewRegistry(), "")
require.NoError(err)
received := set.Set[*testTx]{}
requestSet.onAdd = func(tx *testTx) {
received.Add(tx)
Expand Down
55 changes: 44 additions & 11 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"time"

bloomfilter "github.com/holiman/bloomfilter/v2"
"github.com/prometheus/client_golang/prometheus"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

var (
Expand All @@ -23,18 +25,45 @@ var (
ErrInvalidID = errors.New("invalid id")
)

func NewHandler[T Gossipable](set Set[T], maxResponseSize int) *Handler[T] {
return &Handler[T]{
Handler: p2p.NoOpHandler{},
set: set,
maxResponseSize: maxResponseSize,
func NewHandler[T Gossipable](
set Set[T],
targetResponseSize int,
metrics prometheus.Registerer,
namespace string,
) (*Handler[T], error) {
h := &Handler[T]{
Handler: p2p.NoOpHandler{},
set: set,
targetResponseSize: targetResponseSize,
sentN: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_sent_n",
Help: "amount of gossip sent (n)",
}),
sentBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_sent_bytes",
Help: "amount of gossip sent (bytes)",
}),
}

errs := wrappers.Errs{}
errs.Add(
metrics.Register(h.sentN),
metrics.Register(h.sentBytes),
)

return h, errs.Err
}

type Handler[T Gossipable] struct {
p2p.Handler
set Set[T]
maxResponseSize int
set Set[T]
targetResponseSize int

// metrics
sentN prometheus.Counter
sentBytes prometheus.Counter
}

func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, error) {
Expand Down Expand Up @@ -70,14 +99,15 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
return false
}

// check that this doesn't exceed our maximum configured response size
// check that this doesn't exceed our maximum configured target response
// size
gossipBytes = append(gossipBytes, bytes)

responseSize += len(bytes)
if responseSize > h.maxResponseSize {
if responseSize > h.targetResponseSize {

Check failure on line 107 in network/p2p/gossip/handler.go

View workflow job for this annotation

GitHub Actions / Static analysis

S1008: should use 'return responseSize <= h.targetResponseSize' instead of 'if responseSize > h.targetResponseSize { return false }; return true' (gosimple)
return false
}

gossipBytes = append(gossipBytes, bytes)

return true
})

Expand All @@ -89,5 +119,8 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
Gossip: gossipBytes,
}

h.sentN.Add(float64(len(response.Gossip)))
h.sentBytes.Add(float64(responseSize))

return proto.Marshal(response)
}

0 comments on commit 26032cf

Please sign in to comment.