Skip to content

Commit

Permalink
Add metrics to snapshot/alerts store.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 19, 2019
1 parent 8032e29 commit 7f6d34c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 8 deletions.
3 changes: 2 additions & 1 deletion tests/e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func setupStore(t *testing.T) (scope.TestF, scope.TestF) {
var s *Service
before := func(t *testing.T) {
s = NewService()
s.Start()
foreground := false
s.Start(foreground)
}

after := func(t *testing.T) {
Expand Down
98 changes: 92 additions & 6 deletions tests/e2e/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,56 @@ import (
"sync/atomic"
"time"

"github.com/bbva/qed/api/metricshttp"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"

"github.com/prometheus/client_golang/prometheus"
)

var (
// Prometheus
Qed_store_instances_count = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_store_instances_count",
Help: "Amount of Stores instanciated",
},
)

Qed_store_snapshots_received_total = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_store_snapshots_received_total",
Help: "Amount of snapshots received.",
},
)

Qed_store_alerts_received_total = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_store_alerts_received_total",
Help: "Duration of alerts received.",
},
)

metricsList = []prometheus.Collector{
Qed_store_instances_count,
Qed_store_snapshots_received_total,
Qed_store_alerts_received_total,
}
)

var registerMetrics sync.Once

// Register all metrics.
func Register(r *prometheus.Registry) {
registerMetrics.Do(
func() {
for _, metric := range metricsList {
r.MustRegister(metric)
}
},
)
}

type alertStore struct {
sync.Mutex
d []string
Expand Down Expand Up @@ -121,6 +167,7 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[SNAP], 1)
if r.Method == "POST" {
Qed_store_snapshots_received_total.Inc()
// Decode batch to get signed snapshots and batch version.
var b protocol.BatchSnapshots
buf, err := ioutil.ReadAll(r.Body)
Expand Down Expand Up @@ -188,6 +235,8 @@ func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {
}
return
} else if r.Method == "POST" {
Qed_store_alerts_received_total.Inc()

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -204,6 +253,11 @@ type Service struct {
snaps *snapStore
alerts *alertStore
stats *statStore

metricsServer *http.Server
prometheusRegistry *prometheus.Registry
httpServer *http.Server

quitCh chan bool
}

Expand All @@ -223,14 +277,31 @@ func NewService() *Service {
}
}

func (s *Service) Start() {
func (s *Service) Start(foreground bool) {
// Metrics server.
r := prometheus.NewRegistry()
Register(r)
s.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)
s.metricsServer = &http.Server{Addr: ":18888", Handler: metricsMux}

Qed_store_instances_count.Inc()

go func() {
log.Debugf(" * Starting metrics HTTP server ")
if err := s.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
}
}()

// Snapshot/alert store server.
router := http.NewServeMux()
router.HandleFunc("/stat", s.statHandler())
router.HandleFunc("/batch", s.postBatchHandler())
router.HandleFunc("/snapshot", s.getSnapshotHandler())
router.HandleFunc("/alert", s.alertHandler())

httpServer := &http.Server{Addr: "127.0.0.1:8888", Handler: router}
s.httpServer = &http.Server{Addr: "127.0.0.1:8888", Handler: router}
fmt.Println("Starting test service...")
go func() {
ticker := time.NewTicker(1 * time.Second)
Expand All @@ -243,20 +314,35 @@ func (s *Service) Start() {
atomic.StoreUint64(&s.stats.count[RPS], 0)
case <-s.quitCh:
log.Debugf("\nShutting down the server...")
_ = httpServer.Shutdown(context.Background())
_ = s.httpServer.Shutdown(context.Background())
return
}
}
}()

go (func() {
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed {
if foreground {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
})()
} else {
go (func() {
if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
})()
}
}

func (s *Service) Shutdown() {
// Metrics
Qed_store_instances_count.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := s.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
}
log.Debugf("Done.\n")

s.quitCh <- true
close(s.quitCh)
}
3 changes: 2 additions & 1 deletion tests/gossip/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ import "github.com/bbva/qed/tests/e2e"

func main() {
s := e2e.NewService()
s.Start()
foreground := true
s.Start(foreground)
}

0 comments on commit 7f6d34c

Please sign in to comment.