Skip to content

Commit

Permalink
Add metrics to each processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent c328890 commit 233b329
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 150 deletions.
118 changes: 58 additions & 60 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,50 @@ package auditor

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/bbva/qed/api/metricshttp"
"github.com/bbva/qed/client"
"github.com/bbva/qed/gossip/metrics"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"
)

var (
QedAuditorInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_auditor_instances_count",
Help: "Number of auditor agents running.",
},
)

QedAuditorBatchesProcessSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "qed_auditor_batches_process_seconds",
Help: "Duration of Auditor batch processing",
},
)

QedAuditorBatchesReceivedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_auditor_batches_received_total",
Help: "Number of batches received by auditors.",
},
)

QedAuditorGetMembershipProofErrTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_auditor_get_membership_proof_err_total",
Help: "Number of errors trying to get membership proofs by auditors.",
},
)
)

type Config struct {
QEDUrls []string
PubUrls []string
Expand All @@ -58,9 +83,6 @@ type Auditor struct {
qed *client.HTTPClient
conf Config

metricsServer *http.Server
prometheusRegistry *prometheus.Registry

taskCh chan Task
quitCh chan bool
executionTicker *time.Ticker
Expand All @@ -71,53 +93,37 @@ type Task interface {
}

func NewAuditor(conf Config) (*Auditor, error) {

metrics.QedAuditorInstancesCount.Inc()

// QED client
transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: false}
httpClient := http.DefaultClient
httpClient.Transport = transport
client, err := client.NewHTTPClient(
client.SetHttpClient(httpClient),
client.SetURLs(conf.QEDUrls[0], conf.QEDUrls[1:]...),
client.SetAPIKey(conf.APIKey),
)
if err != nil {
return nil, errors.Wrap(err, "Cannot start http client: ")
}

QedAuditorInstancesCount.Inc()
auditor := Auditor{
qed: client,
qed: client.NewHTTPClient(client.Config{
Endpoints: conf.QEDUrls,
APIKey: conf.APIKey,
Insecure: false,
}),
conf: conf,
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
}

r := prometheus.NewRegistry()
metrics.Register(r)
auditor.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)

auditor.metricsServer = &http.Server{
Addr: conf.MetricsAddr,
Handler: metricsMux,
}

go func() {
log.Debugf(" * Auditor starting metrics HTTP server in addr: %s", conf.MetricsAddr)
if err := auditor.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Auditor can't start metrics HTTP server: %s", err)
}
}()

auditor.executionTicker = time.NewTicker(conf.TaskExecutionInterval)
go auditor.runTaskDispatcher()

return &auditor, nil
}

func (a Auditor) RegisterMetrics(r *prometheus.Registry) {
metrics := []prometheus.Collector{
QedAuditorInstancesCount,
QedAuditorBatchesProcessSeconds,
QedAuditorBatchesReceivedTotal,
QedAuditorGetMembershipProofErrTotal,
}

for _, m := range metrics {
r.Register(m)
}
}

func (a Auditor) runTaskDispatcher() {
for {
select {
Expand Down Expand Up @@ -149,9 +155,8 @@ func (a Auditor) dispatchTasks() {
}

func (a Auditor) Process(b *protocol.BatchSnapshots) {
// Metrics
metrics.QedAuditorBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(metrics.QedAuditorBatchesProcessSeconds)
QedAuditorBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(QedAuditorBatchesProcessSeconds)
defer timer.ObserveDuration()

task := &MembershipTask{
Expand All @@ -160,20 +165,14 @@ func (a Auditor) Process(b *protocol.BatchSnapshots) {
alertsUrl: a.conf.AlertsUrls[0],
taskCh: a.taskCh,
retries: 2,
s: *b.Snapshots[0],
s: b.Snapshots[0],
}

a.taskCh <- task
}

func (a *Auditor) Shutdown() {
metrics.QedAuditorInstancesCount.Dec()

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := a.metricsServer.Shutdown(ctx); err != nil {
log.Infof("Auditor metrics http server shutdown process with error: %v", err)
}

QedAuditorInstancesCount.Dec()
a.executionTicker.Stop()
a.quitCh <- true
close(a.quitCh)
Expand All @@ -187,21 +186,20 @@ type MembershipTask struct {
alertsUrl string
taskCh chan Task
retries int
s protocol.SignedSnapshot
s *protocol.SignedSnapshot
}

func (t *MembershipTask) Do() {

proof, err := t.qed.MembershipDigest(t.s.Snapshot.EventDigest, t.s.Snapshot.Version)
if err != nil {
// TODO: retry
log.Infof("Unable to get membership proof from QED server: %s", err.Error())
log.Infof("Auditor is unable to get membership proof from QED server: %s", err.Error())

switch fmt.Sprintf("%T", err) {
case "*errors.errorString":
t.sendAlert(fmt.Sprintf("Unable to get membership proof from QED server: %s", err.Error()))
t.sendAlert(fmt.Sprintf("Auditor is unable to get membership proof from QED server: %s", err.Error()))
default:
metrics.QedAuditorGetMembershipProofErrTotal.Inc()
QedAuditorGetMembershipProofErrTotal.Inc()
}

return
Expand Down Expand Up @@ -256,7 +254,7 @@ func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, e
}

func (t MembershipTask) sendAlert(msg string) {
resp, err := http.Post(t.alertsUrl, "application/json",
resp, err := http.Post(t.alertsUrl+"/alert", "application/json",
bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore: %v", err)
Expand Down
111 changes: 56 additions & 55 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,50 @@ package monitor

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/bbva/qed/api/metricshttp"
"github.com/bbva/qed/client"
"github.com/bbva/qed/gossip/metrics"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/pkg/errors"

"github.com/prometheus/client_golang/prometheus"
)

var (
QedMonitorInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_monitor_instances_count",
Help: "Number of monitor agents running.",
},
)

QedMonitorBatchesReceivedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_monitor_batches_received_total",
Help: "Number of batches received by monitors.",
},
)

QedMonitorBatchesProcessSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "qed_monitor_batches_process_seconds",
Help: "Duration of Monitor batch processing",
},
)

QedMonitorGetIncrementalProofErrTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_monitor_get_incremental_proof_err_total",
Help: "Number of errors trying to get incremental proofs by monitors.",
},
)
)

type Config struct {
QEDUrls []string
AlertsUrls []string
Expand All @@ -54,12 +79,8 @@ func DefaultConfig() *Config {
}

type Monitor struct {
client *client.HTTPClient
conf *Config

metricsServer *http.Server
prometheusRegistry *prometheus.Registry

client *client.HTTPClient
conf *Config
taskCh chan Task
quitCh chan bool
executionTicker *time.Ticker
Expand All @@ -70,56 +91,41 @@ type Task interface {
}

func NewMonitor(conf *Config) (*Monitor, error) {
// Metrics
metrics.QedMonitorInstancesCount.Inc()

// QED client
transport := http.DefaultTransport.(*http.Transport)
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: false}
httpClient := http.DefaultClient
httpClient.Transport = transport
client, err := client.NewHTTPClient(
client.SetHttpClient(httpClient),
client.SetURLs(conf.QEDUrls[0], conf.QEDUrls[1:]...),
client.SetAPIKey(conf.APIKey),
)
if err != nil {
return nil, errors.Wrap(err, "Cannot start http client: ")
}
QedMonitorInstancesCount.Inc()

monitor := Monitor{
client: client,
client: client.NewHTTPClient(client.Config{
Endpoints: conf.QEDUrls,
APIKey: conf.APIKey,
Insecure: false,
}),
conf: conf,
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
}

r := prometheus.NewRegistry()
metrics.Register(r)
monitor.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)

monitor.metricsServer = &http.Server{
Addr: conf.MetricsAddr,
Handler: metricsMux,
}

go func() {
log.Debugf(" * Monitor starting metrics HTTP server in addr: %s", conf.MetricsAddr)
if err := monitor.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Monitor can't start metrics HTTP server: %s", err)
}
}()

monitor.executionTicker = time.NewTicker(conf.TaskExecutionInterval)
go monitor.runTaskDispatcher()

return &monitor, nil
}

func (m Monitor) RegisterMetrics(r *prometheus.Registry) {
metrics := []prometheus.Collector{
QedMonitorInstancesCount,
QedMonitorBatchesReceivedTotal,
QedMonitorBatchesProcessSeconds,
QedMonitorGetIncrementalProofErrTotal,
}

for _, m := range metrics {
r.Register(m)
}
}

func (m Monitor) Process(b *protocol.BatchSnapshots) {
metrics.QedMonitorBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(metrics.QedMonitorBatchesProcessSeconds)
QedMonitorBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(QedMonitorBatchesProcessSeconds)
defer timer.ObserveDuration()

first := b.Snapshots[0].Snapshot
Expand Down Expand Up @@ -152,12 +158,7 @@ func (m Monitor) runTaskDispatcher() {
}

func (m *Monitor) Shutdown() {
metrics.QedMonitorInstancesCount.Dec()

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := m.metricsServer.Shutdown(ctx); err != nil {
log.Infof("Monitor metrics http server shutdown process with error: %v", err)
}
QedMonitorInstancesCount.Dec()

m.executionTicker.Stop()
m.quitCh <- true
Expand Down Expand Up @@ -211,10 +212,10 @@ func (q QueryTask) sendAlert(msg string) {
}

func (q QueryTask) Do() {
log.Debug("Executing task: %+v", q)
log.Debugf("Executing task: %+v", q)
resp, err := q.client.Incremental(q.Start, q.End)
if err != nil {
metrics.QedMonitorGetIncrementalProofErrTotal.Inc()
QedMonitorGetIncrementalProofErrTotal.Inc()
log.Infof("Monitor is unable to get incremental proof from QED server: %s", err.Error())
return
}
Expand Down
Loading

0 comments on commit 233b329

Please sign in to comment.