Skip to content

Commit

Permalink
Add default configurations for simple implementations of notifier, st…
Browse files Browse the repository at this point in the history
…ore and taskmanager
  • Loading branch information
gdiazlo committed Apr 10, 2019
1 parent d36dc3d commit 42a152f
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 161 deletions.
170 changes: 163 additions & 7 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,176 @@ package cmd

import (
"context"
"fmt"

"github.com/bbva/qed/client"
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/bbva/qed/util"
"github.com/octago/sflags/gen/gpflag"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
)

func newAgentAuditorCommand(ctx context.Context, args []string) *cobra.Command {
var (
QedAuditorInstancesCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "qed_auditor_instances_count",
Help: "Number of auditor agents running.",
},
)

QedAuditorBatchesProcessSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "qed_auditor_batches_process_seconds",
Help: "Duration of Auditor batch processing",
},
)

cmd := &cobra.Command{
Use: "auditor",
Short: "Start a QED auditor",
Long: `Start a QED auditor that reacts to snapshot batches propagated by QED servers and periodically executes membership queries to verify the inclusion of events`,
Run: func(cmd *cobra.Command, args []string) {
QedAuditorBatchesReceivedTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_auditor_batches_received_total",
Help: "Number of batches received by auditors.",
},
)

QedAuditorGetMembershipProofErrTotal = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "qed_auditor_get_membership_proof_err_total",
Help: "Number of errors trying to get membership proofs by auditors.",
},
)
)

var agentAuditorCmd = &cobra.Command{
Use: "auditor",
Short: "Provides access to the QED gossip auditor agent",
Long: `Start a QED auditor that reacts to snapshot batches propagated
by QED servers and periodically executes membership queries to verify
the inclusion of events`,
RunE: runAgentAuditor,
}

var agentAuditorCtx context.Context

func init() {
agentAuditorCtx = configAuditor()
agentCmd.AddCommand(agentAuditorCmd)
}

type auditorConfig struct {
Qed *client.Config
Notifier *gossip.SimpleNotifierConfig
Store *gossip.RestSnapshotStoreConfig
Tasks *gossip.SimpleTasksManagerConfig
}

func newAuditorConfig() *auditorConfig {
return &auditorConfig{
Qed: client.DefaultConfig(),
Notifier: gossip.DefaultSimpleNotifierConfig(),
Store: gossip.DefaultRestSnapshotStoreConfig(),
Tasks: gossip.DefaultSimpleTasksManagerConfig(),
}
}

func configAuditor() context.Context {
conf := newAuditorConfig()
err := gpflag.ParseTo(conf, agentAuditorCmd.PersistentFlags())
if err != nil {
log.Fatalf("err: %v", err)
}

ctx := context.WithValue(agentCtx, k("auditor.config"), conf)

return ctx
}

func runAgentAuditor(cmd *cobra.Command, args []string) error {
agentConfig := agentAuditorCtx.Value(k("agent.config")).(*gossip.Config)
conf := agentAuditorCtx.Value(k("auditor.config")).(*auditorConfig)

log.SetLogger("auditor", agentConfig.Log)

notifier := gossip.NewSimpleNotifierFromConfig(conf.Notifier)
qed, err := client.NewHTTPClientFromConfig(conf.Qed)
if err != nil {
return err
}
tm := gossip.NewSimpleTasksManagerFromConfig(conf.Tasks)
store := gossip.NewRestSnapshotStoreFromConfig(conf.Store)

agent, err := gossip.NewDefaultAgent(agentConfig, qed, store, tm, notifier)
if err != nil {
return err
}

return cmd
bp := gossip.NewBatchProcessor(agent, []gossip.TaskFactory{gossip.PrinterFactory{}, membershipFactory{}})
agent.In.Subscribe(gossip.BatchMessageType, bp, 255)
defer bp.Stop()

agent.Start()
util.AwaitTermSignal(agent.Shutdown)
return nil
}

type membershipFactory struct{}

func (m membershipFactory) Metrics() []prometheus.Collector {
return []prometheus.Collector{
QedAuditorInstancesCount,
QedAuditorBatchesProcessSeconds,
QedAuditorBatchesReceivedTotal,
QedAuditorGetMembershipProofErrTotal,
}
}

func (i membershipFactory) New(ctx context.Context) gossip.Task {
a := ctx.Value("agent").(*gossip.Agent)
b := ctx.Value("batch").(*protocol.BatchSnapshots)

s := b.Snapshots[0]

return func() error {
timer := prometheus.NewTimer(QedAuditorBatchesProcessSeconds)
defer timer.ObserveDuration()

proof, err := a.Qed.MembershipDigest(s.Snapshot.EventDigest, s.Snapshot.Version)
if err != nil {
log.Infof("Auditor is unable to get membership proof from QED server: %v", err)

switch fmt.Sprintf("%T", err) {
case "*errors.errorString":
a.Notifier.Alert(fmt.Sprintf("Auditor is unable to get membership proof from QED server: %v", err))
default:
QedAuditorGetMembershipProofErrTotal.Inc()
}

return err
}

storedSnap, err := a.SnapshotStore.GetSnapshot(proof.CurrentVersion)
if err != nil {
log.Infof("Unable to get snapshot from storage: %v", err)
return err
}

checkSnap := &protocol.Snapshot{
HistoryDigest: s.Snapshot.HistoryDigest,
HyperDigest: storedSnap.Snapshot.HyperDigest,
Version: s.Snapshot.Version,
EventDigest: s.Snapshot.EventDigest,
}

ok := a.Qed.DigestVerify(proof, checkSnap, hashing.NewSha256Hasher)
if !ok {
a.Notifier.Alert(fmt.Sprintf("Unable to verify snapshot %v", s.Snapshot))
log.Infof("Unable to verify snapshot %v", s.Snapshot)
}

log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", s.Snapshot)
return nil
}
}
119 changes: 98 additions & 21 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cmd
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/bbva/qed/client"
Expand Down Expand Up @@ -81,29 +82,17 @@ func init() {

type monitorConfig struct {
Qed *client.Config
Notifier *gossip.DefaultNotifierConfig
Notifier *gossip.SimpleNotifierConfig
Store *gossip.RestSnapshotStoreConfig
Tasks *gossip.DefaultTasksManagerConfig
Tasks *gossip.SimpleTasksManagerConfig
}

func newMonitorConfig() *monitorConfig {
return &monitorConfig{
Qed: client.DefaultConfig(),
Notifier: &gossip.DefaultNotifierConfig{
DialTimeout: 200 * time.Millisecond,
QueueTimeout: 100 * time.Millisecond,
ReadTimeout: 200 * time.Millisecond,
},
Store: &gossip.RestSnapshotStoreConfig{
DialTimeout: 200 * time.Millisecond,
QueueTimeout: 100 * time.Millisecond,
ReadTimeout: 200 * time.Millisecond,
},
Tasks: &gossip.DefaultTasksManagerConfig{
QueueTimeout: 100 * time.Millisecond,
Interval: 200 * time.Millisecond,
MaxTasks: 10,
},
Qed: client.DefaultConfig(),
Notifier: gossip.DefaultSimpleNotifierConfig(),
Store: gossip.DefaultRestSnapshotStoreConfig(),
Tasks: gossip.DefaultSimpleTasksManagerConfig(),
}
}

Expand All @@ -125,20 +114,23 @@ func runAgentMonitor(cmd *cobra.Command, args []string) error {

log.SetLogger("monitor", agentConfig.Log)

notifier := gossip.NewDefaultNotifierFromConfig(conf.Notifier)
notifier := gossip.NewSimpleNotifierFromConfig(conf.Notifier)
qed, err := client.NewHTTPClientFromConfig(conf.Qed)
if err != nil {
return err
}
tm := gossip.NewDefaultTasksManagerFromConfig(conf.Tasks)
tm := gossip.NewSimpleTasksManagerFromConfig(conf.Tasks)
store := gossip.NewRestSnapshotStoreFromConfig(conf.Store)

agent, err := gossip.NewDefaultAgent(agentConfig, qed, store, tm, notifier)
if err != nil {
return err
}

bp := gossip.NewBatchProcessor(agent, []gossip.TaskFactory{gossip.PrinterFactory{}, incrementalFactory{}})
lagf := newLagFactory(1 * time.Second)
lagf.start()
defer lagf.stop()
bp := gossip.NewBatchProcessor(agent, []gossip.TaskFactory{gossip.PrinterFactory{}, incrementalFactory{}, lagf})
agent.In.Subscribe(gossip.BatchMessageType, bp, 255)
defer bp.Stop()

Expand Down Expand Up @@ -172,6 +164,7 @@ func (i incrementalFactory) New(ctx context.Context) gossip.Task {
resp, err := a.Qed.Incremental(first.Version, last.Version)
if err != nil {
QedMonitorGetIncrementalProofErrTotal.Inc()
a.Notifier.Alert(fmt.Sprintf("Monitor is unable to get incremental proof from QED server: %s", err.Error()))
log.Infof("Monitor is unable to get incremental proof from QED server: %s", err.Error())
return err
}
Expand All @@ -185,3 +178,87 @@ func (i incrementalFactory) New(ctx context.Context) gossip.Task {
}
}

type lagFactory struct {
lastVersion uint64
rate uint64
counter uint64
ticker *time.Ticker
quit chan struct{}
}

func newLagFactory(t time.Duration) *lagFactory {
return &lagFactory{
ticker: time.NewTicker(t),
quit: make(chan struct{}),
}
}

func (l *lagFactory) stop() {
close(l.quit)
}

func (l *lagFactory) start() {
go func() {
for {
select {
case <-l.ticker.C:
c := atomic.SwapUint64(&l.counter, 0)
atomic.StoreUint64(&l.rate, c)
case <-l.quit:
l.ticker.Stop()
return
}
}
}()
}

func (l lagFactory) Metrics() []prometheus.Collector {
return []prometheus.Collector{}
}

func (l *lagFactory) New(ctx context.Context) gossip.Task {
a := ctx.Value("agent").(*gossip.Agent)
b := ctx.Value("batch").(*protocol.BatchSnapshots)

counter := atomic.AddUint64(&l.counter, uint64(len(b.Snapshots)))
lastVersion := atomic.LoadUint64(&l.lastVersion)

return func() error {
timer := prometheus.NewTimer(QedMonitorBatchesProcessSeconds)
defer timer.ObserveDuration()

last := b.Snapshots[len(b.Snapshots)-1].Snapshot
localLag := uint64(0)

if lastVersion < last.Version {
localLag = last.Version - lastVersion
atomic.StoreUint64(&l.lastVersion, last.Version)
}

rate := atomic.LoadUint64(&l.rate)

if localLag > rate {
log.Infof("Gossip lag %d > Rate %d", localLag, rate)
}

count, err := a.SnapshotStore.Count()
if err != nil {
return err
}

storeLag := uint64(0)
if lastVersion > count {
storeLag = lastVersion - count
}

if storeLag > rate {
err := a.Notifier.Alert(fmt.Sprintf("Lag between gossip and snapshot store: %d", storeLag))
if err != nil {
log.Infof("LagTask had an error sending a notification: %v", err)
}
log.Infof("Lag between gossip and snapshot store: last seen version %d - store count %d = %d", lastVersion, count, storeLag)
}
log.Infof("Lag status: Rate: %d Counter: %d, Local Lag: %d Store Lag: %d", rate, counter, localLag, storeLag)
return nil
}
}
Loading

0 comments on commit 42a152f

Please sign in to comment.