Skip to content

Commit

Permalink
Refactor: generalize metrics, integrating server and agents metrics A…
Browse files Browse the repository at this point in the history
…PI. Sender and qed server share the same metrics server.
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent 984e60d commit 84d7a7f
Show file tree
Hide file tree
Showing 14 changed files with 279 additions and 275 deletions.
5 changes: 3 additions & 2 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bbva/qed/gossip/auditor"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/util"
)

Expand Down Expand Up @@ -57,8 +58,8 @@ func newAgentAuditorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(&config, []gossip.Processor{auditor})
metricsServer := metrics.NewServer(config.MetricsAddr)
agent, err := gossip.NewAgent(&config, []gossip.Processor{auditor}, metricsServer)
if err != nil {
log.Fatalf("Failed to start the QED auditor: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/monitor"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/util"
)

Expand Down Expand Up @@ -57,8 +58,8 @@ func newAgentMonitorCommand(ctx *cmdContext, config gossip.Config, agentPreRun f
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(&config, []gossip.Processor{monitor})
metricsServer := metrics.NewServer(config.MetricsAddr)
agent, err := gossip.NewAgent(&config, []gossip.Processor{monitor}, metricsServer)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/agent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/publisher"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/util"
)

Expand Down Expand Up @@ -56,8 +57,8 @@ func newAgentPublisherCommand(ctx *cmdContext, config gossip.Config, agentPreRun
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}

agent, err := gossip.NewAgent(&config, []gossip.Processor{publisher})
metricsServer := metrics.NewServer(config.MetricsAddr)
agent, err := gossip.NewAgent(&config, []gossip.Processor{publisher}, metricsServer)
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}
Expand Down
13 changes: 7 additions & 6 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/coocood/freecache"
"github.com/hashicorp/memberlist"
Expand All @@ -38,7 +39,7 @@ type Agent struct {
config *Config
Self *member.Peer

metricsServer *metricsServer
metricsServer *metrics.Server

memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue
Expand All @@ -55,11 +56,11 @@ type Agent struct {
quit chan bool
}

func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
func NewAgent(conf *Config, p []Processor, m *metrics.Server) (agent *Agent, err error) {
log.Infof("New agent %s\n", conf.NodeName)
agent = &Agent{
config: conf,
metricsServer: newMetricsServer(conf.MetricsAddr),
metricsServer: m,
Topology: NewTopology(),
processors: p,
processed: freecache.NewCache(1 << 20),
Expand Down Expand Up @@ -134,11 +135,11 @@ func (a *Agent) ChTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.Ba
func (a *Agent) start() {

for _, p := range a.processors {
p.RegisterMetrics(a.metricsServer.registry)
p.RegisterMetrics(a.metricsServer)
}

go func() {
a.metricsServer.start()
a.metricsServer.Start()
}()

for {
Expand Down Expand Up @@ -261,7 +262,7 @@ func (a *Agent) Shutdown() error {
a.stateLock.Lock()
defer a.stateLock.Unlock()

a.metricsServer.shutdown()
a.metricsServer.Shutdown()

if a.Self.Status == member.Shutdown {
return nil
Expand Down
10 changes: 7 additions & 3 deletions gossip/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import (
"github.com/stretchr/testify/require"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/metrics"
)

func TestJoin(t *testing.T) {
conf := DefaultConfig()
conf.NodeName = "testNode"
conf.Role = member.Auditor
conf.BindAddr = "127.0.0.1:12345"
a, _ := NewAgent(conf, []Processor{FakeProcessor{}})
metricsServer := metrics.NewServer("127.0.0.2:23464")
a, _ := NewAgent(conf, []Processor{FakeProcessor{}}, metricsServer)

testCases := []struct {
agentState member.Status
Expand Down Expand Up @@ -71,7 +73,8 @@ func TestLeave(t *testing.T) {
conf.NodeName = "testNode"
conf.Role = member.Auditor
conf.BindAddr = "127.0.0.1:12346"
a, _ := NewAgent(conf, []Processor{FakeProcessor{}})
metricsServer := metrics.NewServer("127.0.0.2:13445")
a, _ := NewAgent(conf, []Processor{FakeProcessor{}}, metricsServer)

testCases := []struct {
agentState member.Status
Expand Down Expand Up @@ -119,7 +122,8 @@ func TestShutdown(t *testing.T) {
conf.NodeName = "testNode"
conf.Role = member.Auditor
conf.BindAddr = "127.0.0.1:12347"
a, _ := NewAgent(conf, []Processor{FakeProcessor{}})
metricsServer := metrics.NewServer("127.0.0.2:43512")
a, _ := NewAgent(conf, []Processor{FakeProcessor{}}, metricsServer)

testCases := []struct {
agentState member.Status
Expand Down
5 changes: 3 additions & 2 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/bbva/qed/client"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/pkg/errors"

Expand Down Expand Up @@ -123,7 +124,7 @@ func NewAuditor(conf Config) (*Auditor, error) {
return &auditor, nil
}

func (a Auditor) RegisterMetrics(r *prometheus.Registry) {
func (a Auditor) RegisterMetrics(srv *metrics.Server) {
metrics := []prometheus.Collector{
QedAuditorInstancesCount,
QedAuditorBatchesProcessSeconds,
Expand All @@ -132,7 +133,7 @@ func (a Auditor) RegisterMetrics(r *prometheus.Registry) {
}

for _, m := range metrics {
r.Register(m)
srv.Register(m)
}
}

Expand Down
63 changes: 0 additions & 63 deletions gossip/metrics.go

This file was deleted.

5 changes: 3 additions & 2 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/bbva/qed/client"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/pkg/errors"

Expand Down Expand Up @@ -120,7 +121,7 @@ func NewMonitor(conf *Config) (*Monitor, error) {
return &monitor, nil
}

func (m Monitor) RegisterMetrics(r *prometheus.Registry) {
func (m Monitor) RegisterMetrics(srv *metrics.Server) {
metrics := []prometheus.Collector{
QedMonitorInstancesCount,
QedMonitorBatchesReceivedTotal,
Expand All @@ -129,7 +130,7 @@ func (m Monitor) RegisterMetrics(r *prometheus.Registry) {
}

for _, m := range metrics {
r.Register(m)
srv.Register(m)
}
}

Expand Down
10 changes: 5 additions & 5 deletions gossip/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ import (
"net/http"

"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/prometheus/client_golang/prometheus"
)

type Processor interface {
Process(*protocol.BatchSnapshots)
RegisterMetrics(*prometheus.Registry)
RegisterMetrics(*metrics.Server)
}

type FakeProcessor struct {
}

func (d FakeProcessor) Process(b *protocol.BatchSnapshots) {}
func (d FakeProcessor) RegisterMetrics(r *prometheus.Registry) {}
func (d FakeProcessor) Process(b *protocol.BatchSnapshots) {}
func (d FakeProcessor) RegisterMetrics(m *metrics.Server) {}

type DummyProcessor struct {
}

func (d DummyProcessor) RegisterMetrics(r *prometheus.Registry) {}
func (d DummyProcessor) RegisterMetrics(m *metrics.Server) {}

func (d DummyProcessor) Process(b *protocol.BatchSnapshots) {
for i := 0; i < len(b.Snapshots); i++ {
Expand Down
5 changes: 3 additions & 2 deletions gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/bbva/qed/log"
"github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/coocood/freecache"

Expand Down Expand Up @@ -105,15 +106,15 @@ func NewPublisher(conf Config) (*Publisher, error) {
return &publisher, nil
}

func (p Publisher) RegisterMetrics(r *prometheus.Registry) {
func (p Publisher) RegisterMetrics(srv *metrics.Server) {
metrics := []prometheus.Collector{
QedPublisherInstancesCount,
QedPublisherBatchesReceivedTotal,
QedPublisherBatchesProcessSeconds,
}

for _, m := range metrics {
r.Register(m)
srv.Register(m)
}
}

Expand Down
Loading

0 comments on commit 84d7a7f

Please sign in to comment.