From d08fad8daab7c15fa07f969997e12c874460f4a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20D=C3=ADaz?= Date: Thu, 11 Apr 2019 14:31:44 +0200 Subject: [PATCH] register taskFactory metrics in agent metrics server if it present --- gossip/processor.go | 5 ++++ gossip/processor_test.go | 50 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/gossip/processor.go b/gossip/processor.go index ce91689ad..bc4774cf1 100644 --- a/gossip/processor.go +++ b/gossip/processor.go @@ -103,6 +103,11 @@ func (d *BatchProcessor) wasProcessed(b *protocol.BatchSnapshots) bool { func (d *BatchProcessor) Subscribe(id int, ch <-chan *Message) { d.id = id + + if d.a.metrics != nil { + d.a.metrics.MustRegister(d.metrics...) + } + go func() { for { select { diff --git a/gossip/processor_test.go b/gossip/processor_test.go index 8d4fa031a..94f778916 100644 --- a/gossip/processor_test.go +++ b/gossip/processor_test.go @@ -17,11 +17,16 @@ package gossip import ( + "context" + "io/ioutil" + "net/http" + "strings" "sync" "testing" "time" "github.com/bbva/qed/protocol" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -103,3 +108,48 @@ func TestBatchProcessorWasProcessed(t *testing.T) { // dropped by the wasProcessed function require.Equal(t, 1, len(ts.ch), "Output queue must be 1, duplicate event must be dropped by processor") } + +type fakeTaskFactory struct{} + +func (f fakeTaskFactory) Metrics() []prometheus.Collector { + return []prometheus.Collector{ + prometheus.NewCounter(prometheus.CounterOpts{Name: "fakeCounterMetric"}), + } +} + +func (f fakeTaskFactory) New(c context.Context) Task { + return func() error { + return nil + } +} + +func TestBatchProcessorRegisterMetrics(t *testing.T) { + + conf := DefaultConfig() + conf.NodeName = "testNode" + conf.Role = "auditor" + conf.BindAddr = "127.0.0.1:12345" + conf.MetricsAddr = "127.0.0.1:12346" + + a, err := NewAgentFromConfig(conf) + require.NoError(t, err, "Error creating agent!") + a.Start() + defer a.Shutdown() + // wait for agent to start + // all services + time.Sleep(3 * time.Second) + + p := NewBatchProcessor(a, []TaskFactory{&fakeTaskFactory{}}) + a.In.Subscribe(BatchMessageType, p, 0) + defer p.Stop() + + resp, err := http.Get("http://" + conf.MetricsAddr + "/metrics") + if err != nil { + panic(err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + found := strings.Index(string(body), "fakeCounterMetric") + + require.True(t, found > 0, "Metric not found!") +}