diff --git a/gossip/agent.go b/gossip/agent.go index a476db049..d896273b3 100644 --- a/gossip/agent.go +++ b/gossip/agent.go @@ -16,45 +16,17 @@ package gossip import ( - "bytes" "fmt" "net" - "net/http" "sync" "time" "github.com/bbva/qed/gossip/member" "github.com/bbva/qed/log" "github.com/bbva/qed/protocol" - "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/memberlist" ) -type Alert struct { - id string - msg string -} - -func (b *Alert) Encode() ([]byte, error) { - var buf bytes.Buffer - encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) - if err := encoder.Encode(b); err != nil { - log.Errorf("Failed to encode alert into message: %v", err) - return nil, err - } - return buf.Bytes(), nil -} - -func (b *Alert) Decode(msg []byte) error { - reader := bytes.NewReader(msg) - decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{}) - if err := decoder.Decode(b); err != nil { - log.Errorf("Failed to decode alert: %v", err) - return err - } - return nil -} - type Agent struct { config *Config Self *member.Peer @@ -68,10 +40,9 @@ type Agent struct { processors []Processor - In chan *protocol.BatchSnapshots - Out chan *protocol.BatchSnapshots - Alerts chan Alert - quit chan bool + In chan *protocol.BatchSnapshots + Out chan *protocol.BatchSnapshots + quit chan bool } func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) { @@ -82,7 +53,6 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) { processors: p, In: make(chan *protocol.BatchSnapshots, 1000), Out: make(chan *protocol.BatchSnapshots, 1000), - Alerts: make(chan Alert, 100), quit: make(chan bool), } @@ -139,8 +109,6 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) { func (a *Agent) start() { outTicker := time.NewTicker(2 * time.Second) - alertTicker := time.NewTicker(1 * time.Second) - for { select { case batch := <-a.In: @@ -150,35 +118,12 @@ func (a *Agent) start() { a.Out <- batch case <-outTicker.C: go a.sendOutQueue() - case <-alertTicker.C: - go a.processAlertQueue() case <-a.quit: return } } } -func (a *Agent) processAlertQueue() { - url := fmt.Sprintf("%s/alerts", a.config.AlertsUrls[0]) - var al Alert - for { - select { - case al = <-a.Alerts: - default: - return - } - buf, err := al.Encode() - if err != nil { - log.Errorf("Error encoding alert!", err) - } - - resp, err := http.Post(url, "application/octet-stream", bytes.NewBuffer(buf)) - if err != nil || resp.StatusCode != http.StatusOK { - log.Errorf("Error posting alert!", err) - } - } -} - func batchId(b *protocol.BatchSnapshots) string { return fmt.Sprintf("( ttl %d, lv %d)", b.TTL, b.Snapshots[len(b.Snapshots)-1].Snapshot.Version) } diff --git a/gossip/auditor/auditor.go b/gossip/auditor/auditor.go index f96207da9..4d3bd3ad7 100644 --- a/gossip/auditor/auditor.go +++ b/gossip/auditor/auditor.go @@ -17,6 +17,7 @@ package auditor import ( + "bytes" "encoding/base64" "fmt" "io/ioutil" @@ -117,11 +118,20 @@ func (t *MembershipTask) Do() { } ok := t.qed.Verify(proof, checkSnap, hashing.NewSha256Hasher) if !ok { - log.Errorf("Unable to verify snapshot %v", t.s.Snapshot) + t.sendAlert(fmt.Sprintf("Unable to verify snapshot %v", t.s.Snapshot)) + log.Infof("Unable to verify snapshot %v", t.s.Snapshot) } log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", t.s.Snapshot) } +func (t *MembershipTask) sendAlert(msg string) { + + go func() { + http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/octet-stream", bytes.NewBufferString(msg)) + }() + +} + func (m Auditor) Process(b *protocol.BatchSnapshots) { task := &MembershipTask{ diff --git a/tests/e2e/test_service.go b/tests/e2e/test_service.go index cb1996471..eccf901d5 100644 --- a/tests/e2e/test_service.go +++ b/tests/e2e/test_service.go @@ -25,26 +25,24 @@ import ( "sync/atomic" "time" - "github.com/bbva/qed/gossip" "github.com/bbva/qed/protocol" ) type alertStore struct { sync.Mutex - d []*gossip.Alert + d []string } -func (a *alertStore) Append(n *gossip.Alert) { +func (a *alertStore) Append(msg string) { a.Lock() defer a.Unlock() - fmt.Println("Storing alert: ", n) - a.d = append(a.d, n) + a.d = append(a.d, msg) } -func (a *alertStore) GetAll() []*gossip.Alert { +func (a *alertStore) GetAll() []string { a.Lock() defer a.Unlock() - n := make([]*gossip.Alert, len(a.d)) + n := make([]string, len(a.d)) copy(n, a.d) return n } @@ -74,11 +72,12 @@ const ( STAT int = iota SNAP ALERT + RPS ) type statStore struct { sync.Mutex - count [3]uint64 + count [4]uint64 batch map[string][]int } @@ -120,6 +119,7 @@ func (s *Service) statHandler() func(http.ResponseWriter, *http.Request) { func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&s.stats.count[RPS], 1) atomic.AddUint64(&s.stats.count[SNAP], 1) if r.Method == "POST" { // Decode batch to get signed snapshots and batch version. @@ -149,6 +149,7 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) { func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&s.stats.count[RPS], 1) atomic.AddUint64(&s.stats.count[SNAP], 1) if r.Method == "GET" { q := r.URL.Query() @@ -174,6 +175,7 @@ func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request) } func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint64(&s.stats.count[RPS], 1) atomic.AddUint64(&s.stats.count[ALERT], 1) if r.Method == "GET" { b, err := json.Marshal(s.alerts.GetAll()) @@ -187,24 +189,12 @@ func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) { } return } else if r.Method == "POST" { - var b gossip.Alert buf, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - sDec, err := base64.StdEncoding.DecodeString(string(buf)) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - err = b.Decode(sDec) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - s.alerts.Append(&b) + s.alerts.Append(string(buf)) return } http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) @@ -223,7 +213,7 @@ func NewService() *Service { var stats statStore snaps.d = make(map[uint64]*protocol.SignedSnapshot, 0) stats.batch = make(map[string][]int, 0) - alerts.d = make([]*gossip.Alert, 0) + alerts.d = make([]string, 0) return &Service{ snaps: &snaps, alerts: &alerts, @@ -233,13 +223,14 @@ func NewService() *Service { func (s *Service) Start() { go func() { - ticker := time.NewTicker(2 * time.Second) + ticker := time.NewTicker(1 * time.Second) for { select { case <-ticker.C: - c := atomic.LoadUint64(&s.stats.count[STAT]) - fmt.Println("Request per second: ", c/2) - atomic.StoreUint64(&s.stats.count[STAT], 0) + c := atomic.LoadUint64(&s.stats.count[RPS]) + fmt.Println("Request per second: ", c) + fmt.Println("Counters ", s.stats.count) + atomic.StoreUint64(&s.stats.count[RPS], 0) } } }()