Skip to content

Commit

Permalink
Remove alerts from agent, immplement them in processors
Browse files Browse the repository at this point in the history
  • Loading branch information
gdiazlo committed Dec 5, 2018
1 parent 9bd3f62 commit ee3c36c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 85 deletions.
61 changes: 3 additions & 58 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,17 @@
package gossip

import (
"bytes"
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/memberlist"
)

type Alert struct {
id string
msg string
}

func (b *Alert) Encode() ([]byte, error) {
var buf bytes.Buffer
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := encoder.Encode(b); err != nil {
log.Errorf("Failed to encode alert into message: %v", err)
return nil, err
}
return buf.Bytes(), nil
}

func (b *Alert) Decode(msg []byte) error {
reader := bytes.NewReader(msg)
decoder := codec.NewDecoder(reader, &codec.MsgpackHandle{})
if err := decoder.Decode(b); err != nil {
log.Errorf("Failed to decode alert: %v", err)
return err
}
return nil
}

type Agent struct {
config *Config
Self *member.Peer
Expand All @@ -68,10 +40,9 @@ type Agent struct {

processors []Processor

In chan *protocol.BatchSnapshots
Out chan *protocol.BatchSnapshots
Alerts chan Alert
quit chan bool
In chan *protocol.BatchSnapshots
Out chan *protocol.BatchSnapshots
quit chan bool
}

func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
Expand All @@ -82,7 +53,6 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
processors: p,
In: make(chan *protocol.BatchSnapshots, 1000),
Out: make(chan *protocol.BatchSnapshots, 1000),
Alerts: make(chan Alert, 100),
quit: make(chan bool),
}

Expand Down Expand Up @@ -139,8 +109,6 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {

func (a *Agent) start() {
outTicker := time.NewTicker(2 * time.Second)
alertTicker := time.NewTicker(1 * time.Second)

for {
select {
case batch := <-a.In:
Expand All @@ -150,35 +118,12 @@ func (a *Agent) start() {
a.Out <- batch
case <-outTicker.C:
go a.sendOutQueue()
case <-alertTicker.C:
go a.processAlertQueue()
case <-a.quit:
return
}
}
}

func (a *Agent) processAlertQueue() {
url := fmt.Sprintf("%s/alerts", a.config.AlertsUrls[0])
var al Alert
for {
select {
case al = <-a.Alerts:
default:
return
}
buf, err := al.Encode()
if err != nil {
log.Errorf("Error encoding alert!", err)
}

resp, err := http.Post(url, "application/octet-stream", bytes.NewBuffer(buf))
if err != nil || resp.StatusCode != http.StatusOK {
log.Errorf("Error posting alert!", err)
}
}
}

func batchId(b *protocol.BatchSnapshots) string {
return fmt.Sprintf("( ttl %d, lv %d)", b.TTL, b.Snapshots[len(b.Snapshots)-1].Snapshot.Version)
}
Expand Down
12 changes: 11 additions & 1 deletion gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package auditor

import (
"bytes"
"encoding/base64"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -117,11 +118,20 @@ func (t *MembershipTask) Do() {
}
ok := t.qed.Verify(proof, checkSnap, hashing.NewSha256Hasher)
if !ok {
log.Errorf("Unable to verify snapshot %v", t.s.Snapshot)
t.sendAlert(fmt.Sprintf("Unable to verify snapshot %v", t.s.Snapshot))
log.Infof("Unable to verify snapshot %v", t.s.Snapshot)
}
log.Infof("MembershipTask.Do(): Snapshot %v has been verified by QED", t.s.Snapshot)
}

func (t *MembershipTask) sendAlert(msg string) {

go func() {
http.Post(fmt.Sprintf("%s/alert", t.pubUrl), "application/octet-stream", bytes.NewBufferString(msg))
}()

}

func (m Auditor) Process(b *protocol.BatchSnapshots) {

task := &MembershipTask{
Expand Down
43 changes: 17 additions & 26 deletions tests/e2e/test_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,24 @@ import (
"sync/atomic"
"time"

"github.com/bbva/qed/gossip"
"github.com/bbva/qed/protocol"
)

type alertStore struct {
sync.Mutex
d []*gossip.Alert
d []string
}

func (a *alertStore) Append(n *gossip.Alert) {
func (a *alertStore) Append(msg string) {
a.Lock()
defer a.Unlock()
fmt.Println("Storing alert: ", n)
a.d = append(a.d, n)
a.d = append(a.d, msg)
}

func (a *alertStore) GetAll() []*gossip.Alert {
func (a *alertStore) GetAll() []string {
a.Lock()
defer a.Unlock()
n := make([]*gossip.Alert, len(a.d))
n := make([]string, len(a.d))
copy(n, a.d)
return n
}
Expand Down Expand Up @@ -74,11 +72,12 @@ const (
STAT int = iota
SNAP
ALERT
RPS
)

type statStore struct {
sync.Mutex
count [3]uint64
count [4]uint64
batch map[string][]int
}

Expand Down Expand Up @@ -120,6 +119,7 @@ func (s *Service) statHandler() func(http.ResponseWriter, *http.Request) {

func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[SNAP], 1)
if r.Method == "POST" {
// Decode batch to get signed snapshots and batch version.
Expand Down Expand Up @@ -149,6 +149,7 @@ func (s *Service) postBatchHandler() func(http.ResponseWriter, *http.Request) {

func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[SNAP], 1)
if r.Method == "GET" {
q := r.URL.Query()
Expand All @@ -174,6 +175,7 @@ func (s *Service) getSnapshotHandler() func(http.ResponseWriter, *http.Request)
}
func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint64(&s.stats.count[RPS], 1)
atomic.AddUint64(&s.stats.count[ALERT], 1)
if r.Method == "GET" {
b, err := json.Marshal(s.alerts.GetAll())
Expand All @@ -187,24 +189,12 @@ func (s *Service) alertHandler() func(http.ResponseWriter, *http.Request) {
}
return
} else if r.Method == "POST" {
var b gossip.Alert
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sDec, err := base64.StdEncoding.DecodeString(string(buf))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = b.Decode(sDec)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

s.alerts.Append(&b)
s.alerts.Append(string(buf))
return
}
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
Expand All @@ -223,7 +213,7 @@ func NewService() *Service {
var stats statStore
snaps.d = make(map[uint64]*protocol.SignedSnapshot, 0)
stats.batch = make(map[string][]int, 0)
alerts.d = make([]*gossip.Alert, 0)
alerts.d = make([]string, 0)
return &Service{
snaps: &snaps,
alerts: &alerts,
Expand All @@ -233,13 +223,14 @@ func NewService() *Service {

func (s *Service) Start() {
go func() {
ticker := time.NewTicker(2 * time.Second)
ticker := time.NewTicker(1 * time.Second)
for {
select {
case <-ticker.C:
c := atomic.LoadUint64(&s.stats.count[STAT])
fmt.Println("Request per second: ", c/2)
atomic.StoreUint64(&s.stats.count[STAT], 0)
c := atomic.LoadUint64(&s.stats.count[RPS])
fmt.Println("Request per second: ", c)
fmt.Println("Counters ", s.stats.count)
atomic.StoreUint64(&s.stats.count[RPS], 0)
}
}
}()
Expand Down

0 comments on commit ee3c36c

Please sign in to comment.