Skip to content

Commit

Permalink
wip: auditor
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel Díaz López de la Llave committed Nov 29, 2018
1 parent 46e5915 commit 1182af3
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 88 deletions.
12 changes: 10 additions & 2 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cmd

import (
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/auditor"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
Expand All @@ -37,9 +38,16 @@ func newAgentAuditorCommand(ctx *agentContext) *cobra.Command {

agentConfig := ctx.config
agentConfig.Role = member.Auditor
//auditorConfig := auditor.DefaultConfig()
auditorConfig := auditor.DefaultConfig()
auditorConfig.APIKey = apiKey
auditorConfig.QEDEndpoints = qedEndpoints

agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{gossip.DummyProcessor{}})
auditor, err := monitor.NewAuditor(auditorConfig)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{auditor})
if err != nil {
log.Fatalf("Failed to start the QED auditor: %v", err)
}
Expand Down
135 changes: 132 additions & 3 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,145 @@

package auditor

import (
"fmt"
"time"

"github.com/bbva/qed/client"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
)

type Config struct {
QEDEndpoints []string
APIKey string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
}

func DefaultConfig() *Config {
return nil
return &Config{
TaskExecutionInterval: 200 * time.Millisecond,
MaxInFlightTasks: 10,
}
}

type Auditor struct {
client *client.HttpClient
conf *Config

taskCh chan *QueryTask
quitCh chan bool
executionTicker *time.Ticker
}

func NewAuditor(conf *Config) (*Auditor, error) {

client := client.NewHttpClient(conf.QEDEndpoints[0], conf.APIKey)

auditor := &Auditor{
client: client,
conf: conf,
taskCh: make(chan *QueryTask, 100),
quitCh: make(chan bool),
}

go auditor.runTaskDispatcher()

return auditor, nil
}

type QuerTask interface {
Do(*protocol.BatchSnapshots)
}

type IncrementalTask struct {
client *client.HttpClient
Start, End uint64
StartSnapshot, EndSnapshot *protocol.Snapshot
}

func (t *IncrementalTask) Do() {
log.Debug("Executing task: %+v", t)
fmt.Printf("Executing task: %+v\n", t)
resp, err := t.client.Incremental(t.Start, t.End)
if err != nil {
// retry
log.Errorf("Error executing incremental query: %v", err)
}
ok := t.client.VerifyIncremental(resp, t.StartSnapshot, t.EndSnapshot, hashing.NewSha256Hasher())
fmt.Printf("Consistency between versions %d and %d: %v\n", t.Start, t.End, ok)
}

type MembershipTask struct {
client *client.HttpClient
S *protocol.SignedSnapshot
}

func (t *MembershipTask) Do() {
log.Debug("Executing task: %+v", t)
fmt.Printf("Executing task: %+v\n", t)
resp, err := t.client.Membership(t.S.Snapshot.EventDigest, t.S.Snapshot.Version)
if err != nil {
// retry
log.Errorf("Error executing incremental query: %v", err)
}
ok := t.client.Verify(resp, t.StartSnapshot, t.S.Snapshot., hashing.NewSha256Hasher())
fmt.Printf("Membership\n", t.Start, t.End, ok)
}

func (m Auditor) Process(b *protocol.BatchSnapshots) {

first := b.Snapshots[0].Snapshot
last := b.Snapshots[len(b.Snapshots)-1].Snapshot

log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version)

task := &QueryTask{
Start: first.Version,
End: last.Version,
StartSnapshot: first,
EndSnapshot: last,
}

m.taskCh <- task
}

func (m *Auditor) runTaskDispatcher() {
m.executionTicker = time.NewTicker(m.conf.TaskExecutionInterval)
for {
select {
case <-m.executionTicker.C:
log.Debug("Dispatching tasks...")
m.dispatchTasks()
case <-m.quitCh:
return
}
}
}

func (m *Auditor) Shutdown() {
m.executionTicker.Stop()
m.quitCh <- true
close(m.quitCh)
close(m.taskCh)
}

func NewAuditor(conf *Config) *Auditor {
return &Auditor{}
func (m *Auditor) dispatchTasks() {
count := 0
var task *QueryTask
defer log.Debugf("%d tasks dispatched", count)
for {
select {
case task = <-m.taskCh:
go task.Do()
count++
default:
return
}
if count >= m.conf.MaxInFlightTasks {
return
}
}
}
20 changes: 20 additions & 0 deletions protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ type SignedSnapshot struct {
Signature []byte
}

func (b *SignedSnapshot) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(b); err != nil {
log.Errorf("Failed to encode message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}

func (b *SignedSnapshot) Decode(msg []byte) error {
reader := bytes.NewReader(msg)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(b); err != nil {
log.Errorf("Failed to decode snapshots batch: %v", err)
return err
}
return nil
}

type BatchSnapshots struct {
Snapshots []*SignedSnapshot
TTL int
Expand Down
Loading

0 comments on commit 1182af3

Please sign in to comment.