Skip to content

Commit

Permalink
add auditor code to execute the membership against qed using the data…
Browse files Browse the repository at this point in the history
… from the snapshot store
  • Loading branch information
gdiazlo committed Dec 3, 2018
1 parent ecb95a4 commit 507e17c
Showing 1 changed file with 57 additions and 52 deletions.
109 changes: 57 additions & 52 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package auditor

import (
"encoding/base64"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/bbva/qed/client"
Expand All @@ -26,16 +29,9 @@ import (
"github.com/bbva/qed/protocol"
)

type QueryTask struct {
Start, End uint64
StartSnapshot, EndSnapshot *protocol.Snapshot
}

func (q *QueryTask) Do() {
}

type Config struct {
QEDEndpoints []string
QEDUrls []string
PubUrls []string
APIKey string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
Expand All @@ -49,22 +45,19 @@ func DefaultConfig() *Config {
}

type Auditor struct {
client *client.HttpClient
conf *Config
qed *client.HttpClient
conf *Config

taskCh chan *QueryTask
taskCh chan Task
quitCh chan bool
executionTicker *time.Ticker
}

func NewAuditor(conf *Config) (*Auditor, error) {

client := client.NewHttpClient(conf.QEDEndpoints[0], conf.APIKey)

auditor := &Auditor{
client: client,
qed: client.NewHttpClient(conf.QEDUrls[0], conf.APIKey),
conf: conf,
taskCh: make(chan *QueryTask, 100),
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
}

Expand All @@ -73,57 +66,69 @@ func NewAuditor(conf *Config) (*Auditor, error) {
return auditor, nil
}

type QuerTasker interface {
Do(*protocol.BatchSnapshots)
}

type IncrementalTask struct {
client *client.HttpClient
Start, End uint64
StartSnapshot, EndSnapshot *protocol.Snapshot
type Task interface {
Do()
}

func (t *IncrementalTask) Do() {
log.Debug("Executing task: %+v", t)
fmt.Printf("Executing task: %+v\n", t)
resp, err := t.client.Incremental(t.Start, t.End)
func (t *MembershipTask) getSnapshot(version uint64) (*protocol.SignedSnapshot, error) {
resp, err := http.Get(fmt.Sprintf("%s/snapshot?v=%d", t.pubUrl, version))
if err != nil {
// retry
log.Errorf("Error executing incremental query: %v", err)
return nil, fmt.Errorf("Error getting snapshot from the store: %v", err)
}
defer resp.Body.Close()
buf, err := ioutil.ReadAll(resp.Body)
sDec, err := base64.StdEncoding.DecodeString(string(buf))
if err != nil {
fmt.Println("##########################", buf)
return nil, fmt.Errorf("Error decoding signed snapshot %d base64", t.s.Snapshot.Version)
}
var s protocol.SignedSnapshot
err = s.Decode(sDec)
if err != nil {
return nil, fmt.Errorf("Error decoding signed snapshot %d codec", t.s.Snapshot.Version)
}
ok := t.client.VerifyIncremental(resp, t.StartSnapshot, t.EndSnapshot, hashing.NewSha256Hasher())
fmt.Printf("Consistency between versions %d and %d: %v\n", t.Start, t.End, ok)
return &s, nil
}

type MembershipTask struct {
client *client.HttpClient
S *protocol.SignedSnapshot
qed *client.HttpClient
pubUrl string
taskCh chan Task
s *protocol.SignedSnapshot
}

func (t *MembershipTask) Do() {
log.Debug("Executing task: %+v", t)
fmt.Printf("Executing task: %+v\n", t)
resp, err := t.client.Membership(t.S.Snapshot.EventDigest, t.S.Snapshot.Version)
proof, err := t.qed.Membership(t.s.Snapshot.EventDigest, t.s.Snapshot.Version)
if err != nil {
// retry
log.Errorf("Error executing incremental query: %v", err)
}
ok := t.client.Verify(resp, t.S.Snapshot, hashing.NewSha256Hasher)
fmt.Printf("Membership\n" /*t.Start, t.End,*/, ok)

snap, err := t.getSnapshot(proof.CurrentVersion)
if err != nil {
log.Infof("Unable to get snapshot from storage, try later: %v", err)
t.taskCh <- t
return
}
checkSnap := &protocol.Snapshot{
HistoryDigest: t.s.Snapshot.HistoryDigest,
HyperDigest: snap.Snapshot.HyperDigest,
Version: t.s.Snapshot.Version,
EventDigest: t.s.Snapshot.EventDigest,
}
ok := t.qed.Verify(proof, checkSnap, hashing.NewSha256Hasher)
if !ok {
log.Errorf("Unable to verify snapshot %v", t.s.Snapshot)
}
log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", t.s.Snapshot)
}

func (m Auditor) Process(b *protocol.BatchSnapshots) {

first := b.Snapshots[0].Snapshot
last := b.Snapshots[len(b.Snapshots)-1].Snapshot

log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version)

task := &QueryTask{
Start: first.Version,
End: last.Version,
StartSnapshot: first,
EndSnapshot: last,
task := &MembershipTask{
qed: m.qed,
pubUrl: m.conf.PubUrls[0],
taskCh: m.taskCh,
s: b.Snapshots[0],
}

m.taskCh <- task
Expand Down Expand Up @@ -151,7 +156,7 @@ func (m *Auditor) Shutdown() {

func (m *Auditor) dispatchTasks() {
count := 0
var task *QueryTask
var task Task
defer log.Debugf("%d tasks dispatched", count)
for {
select {
Expand Down

0 comments on commit 507e17c

Please sign in to comment.