Skip to content

Commit

Permalink
Place metrics in specific metrics package.
Browse files Browse the repository at this point in the history
Add metric-server tests.
Add metric-server start/stop functionality.

Co-authored-by: pancho horrillo <[email protected]>
  • Loading branch information
2 people authored and iknite committed Feb 19, 2019
1 parent 4c045b2 commit 9af664c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 44 deletions.
29 changes: 6 additions & 23 deletions api/apihttp/apihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,15 @@ import (
"time"

"github.com/bbva/qed/log"
qedMetrics "github.com/bbva/qed/metrics"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/raftwal"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rcrowley/go-metrics"
)

var (
funcDuration = promauto.NewGauge(prometheus.GaugeOpts{
Name: "example_function_duration_seconds",
Help: "Duration of the last call of an example function.",
})

requestDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "example_request_duration_seconds",
Help: "Histogram for the runtime of a simple example function.",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 10),
})

// Prometheus:
opsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "qed_healthcheck_ops_total",
Help: "The total number of processed events",
})

// Crowley:
cOpsProcessed = metrics.NewCounter()
)
Expand All @@ -70,17 +53,17 @@ type HealthCheckResponse struct {
// If everything is allright, the HTTP status is 200 and the body contains:
// {"version": "0", "status":"ok"}
func HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
timer := prometheus.NewTimer(requestDuration)
timer := prometheus.NewTimer(qedMetrics.RequestDuration)
defer timer.ObserveDuration()

timer2 := prometheus.NewTimer(prometheus.ObserverFunc(funcDuration.Set))
timer2 := prometheus.NewTimer(prometheus.ObserverFunc(qedMetrics.FuncDuration.Set))
defer timer2.ObserveDuration()

// Do something here that takes time.
time.Sleep(time.Duration(rand.NormFloat64()*10000+50000) * time.Microsecond)

// Prometheus
opsProcessed.Inc()
qedMetrics.OpsProcessed.Inc()

// Crowley
cOpsProcessed.Inc(1)
Expand All @@ -102,9 +85,9 @@ func HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
// In the future we could report back on the status of our DB, or our cache
// (e.g. Redis) by performing a simple PING, and include them in the response.
out := new(bytes.Buffer)
json.Compact(out, resultJson)
_ = json.Compact(out, resultJson)

w.Write(out.Bytes())
_, _ = w.Write(out.Bytes())
}

// Add posts an event into the system:
Expand Down
13 changes: 13 additions & 0 deletions api/metricshttp/metricshttp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package metricshttp

import (
"net/http"

"github.com/prometheus/client_golang/prometheus/promhttp"
)

func NewMetricsHttp() *http.ServeMux {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
return mux
}
24 changes: 24 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package metrics
import (
"expvar"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
Expand All @@ -28,6 +30,11 @@ var (
History *expvar.Map
// BalloonStats has a Map of all the stats relative to Balloon
Balloon *expvar.Map

// Prometheus
FuncDuration prometheus.Gauge
RequestDuration prometheus.Histogram
OpsProcessed prometheus.Counter
)

// Implement expVar.Var interface
Expand All @@ -41,4 +48,21 @@ func init() {
Hyper = expvar.NewMap("hyper_stats")
History = expvar.NewMap("history_stats")
Balloon = expvar.NewMap("balloon_stats")

FuncDuration = promauto.NewGauge(prometheus.GaugeOpts{
Name: "example_function_duration_seconds",
Help: "Duration of the last call of an example function.",
})

RequestDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "example_request_duration_seconds",
Help: "Histogram for the runtime of a simple example function.",
Buckets: prometheus.LinearBuckets(0.01, 0.01, 10),
})

OpsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "qed_healthcheck_ops_total",
Help: "The total number of processed events",
})

}
70 changes: 49 additions & 21 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/bbva/qed/api/apihttp"
"github.com/bbva/qed/api/metricshttp"
"github.com/bbva/qed/api/mgmthttp"
"github.com/bbva/qed/api/tampering"
"github.com/bbva/qed/gossip"
Expand All @@ -46,7 +47,6 @@ import (
"github.com/bbva/qed/util"
metricsprom "github.com/deathowl/go-metrics-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rcrowley/go-metrics"
)

Expand All @@ -55,15 +55,17 @@ type Server struct {
conf *Config
bootstrap bool // Set bootstrap to true when bringing up the first node as a master

httpServer *http.Server
mgmtServer *http.Server
raftBalloon *raftwal.RaftBalloon
tamperingServer *http.Server
profilingServer *http.Server
signer sign.Signer
sender *sender.Sender
agent *gossip.Agent
agentsQueue chan *protocol.Snapshot
httpServer *http.Server
mgmtServer *http.Server
raftBalloon *raftwal.RaftBalloon
tamperingServer *http.Server
profilingServer *http.Server
metricsServer *http.Server
signer sign.Signer
sender *sender.Sender
agent *gossip.Agent
agentsQueue chan *protocol.Snapshot
metricsUpdaterControl chan bool
}

func serverInfo(conf *Config) http.HandlerFunc {
Expand Down Expand Up @@ -170,7 +172,9 @@ func NewServer(conf *Config) (*Server, error) {
server.profilingServer = newHTTPServer("localhost:6060", nil)
}
if conf.EnableMetrics {
http.Handle("/metrics", promhttp.Handler())
server.metricsUpdaterControl = make(chan bool)
metricsMux := metricshttp.NewMetricsHttp()
server.metricsServer = newHTTPServer("localhost:9990", metricsMux)
}

return server, nil
Expand All @@ -186,7 +190,7 @@ func join(joinAddr, raftAddr, nodeID string) error {
return err
}
defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body)
_, _ = io.Copy(ioutil.Discard, resp.Body)

return nil
}
Expand All @@ -203,18 +207,29 @@ func (s *Server) Start() error {
}

if s.conf.EnableMetrics {
pClient := metricsprom.NewPrometheusProvider(
metrics.DefaultRegistry,
"qed",
"crowley",
prometheus.DefaultRegisterer,
1*time.Second,
)
go pClient.UpdatePrometheusMetrics()
go func() {
pClient := metricsprom.NewPrometheusProvider(
metrics.DefaultRegistry,
"qed",
"crowley",
prometheus.DefaultRegisterer,
1*time.Second,
)

ticker := time.NewTicker(pClient.FlushInterval)
for {
select {
case <-ticker.C:
_ = pClient.UpdatePrometheusMetricsOnce()
case <-s.metricsUpdaterControl:
return
}
}
}()

go func() {
log.Debugf(" * Starting metrics HTTP server in addr: localhost:9990")
if err := http.ListenAndServe("localhost:9990", nil); err != http.ErrServerClosed {
if err := s.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
}
}()
Expand Down Expand Up @@ -292,6 +307,19 @@ func (s *Server) Start() error {
// Stop will close all the channels from the mux servers.
func (s *Server) Stop() error {
fmt.Printf("\nShutting down QED server %s", s.conf.NodeID)

if s.conf.EnableMetrics {
log.Debugf("Metrics enabled: stopping server...")
if err := s.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
return err
}
log.Debugf("Done.\n")

s.metricsUpdaterControl <- true
close(s.metricsUpdaterControl)
}

if s.tamperingServer != nil {
log.Debugf("Tampering enabled: stopping server...")
if err := s.tamperingServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
Expand Down
19 changes: 19 additions & 0 deletions tests/e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
QEDProfilingURL = "http://localhost:6060/debug/pprof"
QEDMetricsURL = "http://localhost:9990/metrics"
)

// FIXME: This function should also include testing for the other servers, not
Expand Down Expand Up @@ -68,4 +69,22 @@ func TestStart(t *testing.T) {
assert.Error(t, err, "Subprocess must exit with non-zero status")
})
})

scenario("Test availability of metrics server", func() {
let("Query metrics endpoint", func(t *testing.T) {
cmd := exec.Command("curl",
"--fail",
"-sS",
"-XGET",
"-H", fmt.Sprintf("Api-Key:%s", APIKey),
"-H", "Content-type: application/json",
QEDMetricsURL,
)

_, err := cmd.CombinedOutput()
assert.NoError(t, err, "Subprocess must not exit with non-zero status")
})

})

}

0 comments on commit 9af664c

Please sign in to comment.