Skip to content

Commit

Permalink
Clean processor messages
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent 5929023 commit b833486
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 82 deletions.
52 changes: 26 additions & 26 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/bbva/qed/api/metricshttp"
Expand All @@ -41,6 +40,7 @@ import (
type Config struct {
QEDUrls []string
PubUrls []string
AlertsUrls []string
APIKey string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
Expand All @@ -66,6 +66,10 @@ type Auditor struct {
executionTicker *time.Ticker
}

type Task interface {
Do()
}

func NewAuditor(conf Config) (*Auditor, error) {

metrics.QedAuditorInstancesCount.Inc()
Expand Down Expand Up @@ -96,16 +100,15 @@ func NewAuditor(conf Config) (*Auditor, error) {
auditor.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)

addr := strings.Split(conf.MetricsAddr, ":")
auditor.metricsServer = &http.Server{
Addr: ":1" + addr[1],
Addr: conf.MetricsAddr,
Handler: metricsMux,
}

go func() {
log.Debugf(" * Starting metrics HTTP server in addr: %s", conf.MetricsAddr)
log.Debugf(" * Auditor starting metrics HTTP server in addr: %s", conf.MetricsAddr)
if err := auditor.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
log.Errorf("Auditor can't start metrics HTTP server: %s", err)
}
}()

Expand Down Expand Up @@ -145,49 +148,46 @@ func (a Auditor) dispatchTasks() {
}
}

func (a Auditor) Process(b protocol.BatchSnapshots) {
func (a Auditor) Process(b *protocol.BatchSnapshots) {
// Metrics
metrics.QedAuditorBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(metrics.QedAuditorBatchesProcessSeconds)
defer timer.ObserveDuration()

task := &MembershipTask{
qed: a.qed,
pubUrl: a.conf.PubUrls[0],
taskCh: a.taskCh,
retries: 2,
s: *b.Snapshots[0],
qed: a.qed,
pubUrl: a.conf.PubUrls[0],
alertsUrl: a.conf.AlertsUrls[0],
taskCh: a.taskCh,
retries: 2,
s: *b.Snapshots[0],
}

a.taskCh <- task
}

func (a *Auditor) Shutdown() {
// Metrics
metrics.QedAuditorInstancesCount.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := a.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
log.Error(err)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := a.metricsServer.Shutdown(ctx); err != nil {
log.Infof("Auditor metrics http server shutdown process with error: %v", err)
}
log.Debugf("Done.\n")

a.executionTicker.Stop()
a.quitCh <- true
close(a.quitCh)
close(a.taskCh)
}

type Task interface {
Do()
log.Debugf("Auditor stopped.")
}

type MembershipTask struct {
qed *client.HTTPClient
pubUrl string
taskCh chan Task
retries int
s protocol.SignedSnapshot
qed *client.HTTPClient
pubUrl string
alertsUrl string
taskCh chan Task
retries int
s protocol.SignedSnapshot
}

func (t *MembershipTask) Do() {
Expand Down Expand Up @@ -256,7 +256,7 @@ func (t MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, e
}

func (t MembershipTask) sendAlert(msg string) {
resp, err := http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/json",
resp, err := http.Post(t.alertsUrl, "application/json",
bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore: %v", err)
Expand Down
55 changes: 25 additions & 30 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/bbva/qed/api/metricshttp"
Expand All @@ -40,7 +39,7 @@ import (

type Config struct {
QEDUrls []string
PubUrls []string
AlertsUrls []string
APIKey string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
Expand All @@ -56,7 +55,7 @@ func DefaultConfig() *Config {

type Monitor struct {
client *client.HTTPClient
conf Config
conf *Config

metricsServer *http.Server
prometheusRegistry *prometheus.Registry
Expand All @@ -66,7 +65,11 @@ type Monitor struct {
executionTicker *time.Ticker
}

func NewMonitor(conf Config) (*Monitor, error) {
type Task interface {
Do()
}

func NewMonitor(conf *Config) (*Monitor, error) {
// Metrics
metrics.QedMonitorInstancesCount.Inc()

Expand Down Expand Up @@ -96,16 +99,15 @@ func NewMonitor(conf Config) (*Monitor, error) {
monitor.prometheusRegistry = r
metricsMux := metricshttp.NewMetricsHTTP(r)

addr := strings.Split(conf.MetricsAddr, ":")
monitor.metricsServer = &http.Server{
Addr: ":1" + addr[1],
Addr: conf.MetricsAddr,
Handler: metricsMux,
}

go func() {
log.Debugf(" * Starting metrics HTTP server in addr: %s", conf.MetricsAddr)
log.Debugf(" * Monitor starting metrics HTTP server in addr: %s", conf.MetricsAddr)
if err := monitor.metricsServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorf("Can't start metrics HTTP server: %s", err)
log.Errorf("Monitor can't start metrics HTTP server: %s", err)
}
}()

Expand All @@ -115,24 +117,19 @@ func NewMonitor(conf Config) (*Monitor, error) {
return &monitor, nil
}

type Task interface {
Do()
}

func (m Monitor) Process(b protocol.BatchSnapshots) {
// Metrics
func (m Monitor) Process(b *protocol.BatchSnapshots) {
metrics.QedMonitorBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(metrics.QedMonitorBatchesProcessSeconds)
defer timer.ObserveDuration()

first := b.Snapshots[0].Snapshot
last := b.Snapshots[len(b.Snapshots)-1].Snapshot

log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version)
log.Debugf("Monitor processing batch from versions %d to %d", first.Version, last.Version)

task := QueryTask{
client: m.client,
pubUrl: m.conf.PubUrls[0],
alertsUrl: m.conf.AlertsUrls[0],
Start: first.Version,
End: last.Version,
StartSnapshot: *first,
Expand All @@ -155,20 +152,18 @@ func (m Monitor) runTaskDispatcher() {
}

func (m *Monitor) Shutdown() {
// Metrics
metrics.QedMonitorInstancesCount.Dec()

log.Debugf("Metrics enabled: stopping server...")
// TODO include timeout instead nil
if err := m.metricsServer.Shutdown(context.Background()); err != nil {
log.Error(err)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := m.metricsServer.Shutdown(ctx); err != nil {
log.Infof("Monitor metrics http server shutdown process with error: %v", err)
}
log.Debugf("Done.\n")

m.executionTicker.Stop()
m.quitCh <- true
close(m.quitCh)
close(m.taskCh)
log.Debugf("Monitor stopped.")
}

func (m Monitor) dispatchTasks() {
Expand All @@ -195,23 +190,23 @@ func (m Monitor) dispatchTasks() {

type QueryTask struct {
client *client.HTTPClient
pubUrl string
alertsUrl string
taskCh chan Task
Start, End uint64
StartSnapshot, EndSnapshot protocol.Snapshot
}

func (q QueryTask) sendAlert(msg string) {
resp, err := http.Post(fmt.Sprintf("%s/alert", q.pubUrl), "application/json", bytes.NewBufferString(msg))
resp, err := http.Post(q.alertsUrl+"/alert", "application/json", bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore (task re-enqueued): %v", err)
log.Infof("Monitor had an error saving batch in alertStore (task re-enqueued): %v", err)
q.taskCh <- q
return
}
defer resp.Body.Close()
_, err = io.Copy(ioutil.Discard, resp.Body)
if err != nil {
log.Infof("Error getting response from alertStore saving a batch: %v", err)
log.Infof("MOnitor had an error from alertStore saving a batch: %v", err)
}
}

Expand All @@ -220,13 +215,13 @@ func (q QueryTask) Do() {
resp, err := q.client.Incremental(q.Start, q.End)
if err != nil {
metrics.QedMonitorGetIncrementalProofErrTotal.Inc()
log.Infof("Unable to get incremental proof from QED server: %s", err.Error())
log.Infof("Monitor is unable to get incremental proof from QED server: %s", err.Error())
return
}
ok := q.client.VerifyIncremental(resp, &q.StartSnapshot, &q.EndSnapshot, hashing.NewSha256Hasher())
if !ok {
q.sendAlert(fmt.Sprintf("Unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version))
log.Infof("Unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version)
q.sendAlert(fmt.Sprintf("Monitor is unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version))
log.Infof("Monitor is unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version)
}
log.Debugf("Consistency between versions %d and %d: %v\n", q.Start, q.End, ok)
log.Debugf("Monitor verified a consistency proof between versions %d and %d: %v\n", q.Start, q.End, ok)
}
Loading

0 comments on commit b833486

Please sign in to comment.