Skip to content

Commit

Permalink
Register metrics in server start and move sender metrics to sender file
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent 84d7a7f commit 840006a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 60 deletions.
37 changes: 33 additions & 4 deletions gossip/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ import (
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/sign"
"github.com/prometheus/client_golang/prometheus"
)

var (
// SENDER

QedSenderInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_sender_instances_count",
Help: "Number of sender agents running",
},
)
QedSenderBatchesSentTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_sender_batches_sent_total",
Help: "Number of batches sent by the sender.",
},
)
)

type Sender struct {
Expand Down Expand Up @@ -56,7 +74,7 @@ func DefaultConfig() *Config {
}

func NewSender(a *gossip.Agent, c *Config, s sign.Signer) *Sender {
metrics.QedSenderInstancesCount.Inc()
QedSenderInstancesCount.Inc()
return &Sender{
agent: a,
config: c,
Expand All @@ -76,6 +94,17 @@ func (s Sender) Start(ch chan *protocol.Snapshot) {
<-s.quit
}

func (s Sender) RegisterMetrics(srv *metrics.Server) {
metrics := []prometheus.Collector{
QedSenderInstancesCount,
QedSenderBatchesSentTotal,
}

for _, m := range metrics {
srv.Register(m)
}
}

func (s Sender) newBatch() *protocol.BatchSnapshots {
return &protocol.BatchSnapshots{
TTL: s.config.TTL,
Expand Down Expand Up @@ -103,7 +132,7 @@ func (s Sender) batcherSender(id int, ch chan *protocol.Snapshot, quit chan bool
log.Errorf("Failed signing message: %v", err)
}
batch.Snapshots = append(batch.Snapshots, ss)
case b := <- s.out:
case b := <-s.out:
go s.sender(b)
case <-time.After(s.config.SendTimer):
// send whatever we have on each tick, do not wait
Expand All @@ -126,7 +155,7 @@ func (s Sender) sender(batch *protocol.BatchSnapshots) {
msg, _ := batch.Encode()
peers := s.agent.Topology.Each(s.config.EachN, nil)
for _, peer := range peers.L {
metrics.QedSenderBatchesSentTotal.Inc()
QedSenderBatchesSentTotal.Inc()
dst := peer.Node()

log.Debugf("Sending batch %+v to node %+v\n", batch, dst.Name)
Expand All @@ -140,7 +169,7 @@ func (s Sender) sender(batch *protocol.BatchSnapshots) {
}

func (s Sender) Stop() {
metrics.QedSenderInstancesCount.Dec()
QedSenderInstancesCount.Dec()
close(s.quit)
}

Expand Down
56 changes: 1 addition & 55 deletions metrics/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,10 @@
package metrics

import (
"expvar"
"fmt"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

var (

// Balloon has a Map of all the stats relative to Balloon
Balloon *expvar.Map

// Prometheus

// SERVER

QedInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_instances_count",
Expand Down Expand Up @@ -136,24 +124,9 @@ var (
},
)

// SENDER

QedSenderInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_sender_instances_count",
Help: "Number of sender agents running",
},
)
QedSenderBatchesSentTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_sender_batches_sent_total",
Help: "Number of batches sent by the sender.",
},
)

// PROMETHEUS

metricsList = []prometheus.Collector{
DefaultMetrics = []prometheus.Collector{
QedInstancesCount,
QedAPIHealthcheckRequestsTotal,

Expand All @@ -166,33 +139,6 @@ var (
QedBalloonMembershipTotal,
QedBalloonDigestMembershipTotal,
QedBalloonIncrementalTotal,

QedSenderInstancesCount,
QedSenderBatchesSentTotal,
}

registerMetrics sync.Once
)

// Register all metrics.
func Register(r *prometheus.Registry) {
// Register the metrics.
registerMetrics.Do(
func() {
for _, metric := range metricsList {
r.MustRegister(metric)
}
},
)
}

// Implement expVar.Var interface
type Uint64ToVar uint64

func (v Uint64ToVar) String() string {
return fmt.Sprintf("%d", v)
}

func init() {
Balloon = expvar.NewMap("Qed_balloon_stats")
}
25 changes: 25 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package metrics

import (
"context"
"expvar"
"fmt"
"net/http"
"time"

Expand All @@ -26,11 +28,29 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Balloon has a Map of all the stats relative to Balloon
var Balloon *expvar.Map

// Implement expVar.Var interface
type Uint64ToVar uint64

func (v Uint64ToVar) String() string {
return fmt.Sprintf("%d", v)
}

func init() {
Balloon = expvar.NewMap("Qed_balloon_stats")
}

// A metrics server holds the http API and the prometheus registry
// which provides access to the registered metrics.
type Server struct {
server *http.Server
registry *prometheus.Registry
}

// Create new metrics server. Do not listen to the given address until
// the server is started.
func NewServer(addr string) *Server {
r := prometheus.NewRegistry()
return &Server{
Expand All @@ -42,17 +62,22 @@ func NewServer(addr string) *Server {
}
}

// Listens on the configured address and blocks until shutdown is called.
func (m Server) Start() {
if err := m.server.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
}
}

// Gracefully shitdown metrics http server waiting 5 seconds for
// connections to be closed.
func (m Server) Shutdown() {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
m.server.Shutdown(ctx)
}

// Register a prometheus collector in the prometheus registry used
// by the metrics server.
func (m Server) Register(metric prometheus.Collector) {
if err := m.registry.Register(metric); err != nil {
log.Infof("metric not registered:", err)
Expand Down
6 changes: 5 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,11 @@ func NewServer(conf *Config) (*Server, error) {
return nil, err
}

// metrics server
// create metrics server and register default qed metrics
server.metricsServer = metrics.NewServer(conf.MetricsAddr)
for _, m := range metrics.DefaultMetrics {
server.metricsServer.Register(m)
}

// Create gossip agent
config := gossip.DefaultConfig()
Expand All @@ -148,6 +151,7 @@ func NewServer(conf *Config) (*Server, error) {

// Create sender
server.sender = sender.NewSender(server.agent, sender.DefaultConfig(), server.signer)
server.sender.RegisterMetrics(server.metricsServer)

// Create RaftBalloon
server.raftBalloon, err = raftwal.NewRaftBalloon(conf.RaftPath, conf.RaftAddr, conf.NodeID, store, server.agentsQueue)
Expand Down

0 comments on commit 840006a

Please sign in to comment.