Skip to content

Commit

Permalink
Add 1 event. Check that it has been published without alerts
Browse files Browse the repository at this point in the history
Co-authored-by: iknite <[email protected]
  • Loading branch information
Jose Luis Lucas authored and iknite committed Dec 18, 2018
1 parent 16be795 commit 7be3930
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 15 deletions.
11 changes: 7 additions & 4 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func newAgentMonitorCommand(ctx *agentContext) *cobra.Command {

var qedEndpoints []string
var qedUrls, pubUrls []string

cmd := &cobra.Command{
Use: "monitor",
Expand All @@ -40,7 +40,8 @@ func newAgentMonitorCommand(ctx *agentContext) *cobra.Command {
agentConfig.Role = member.Monitor
monitorConfig := monitor.DefaultConfig()
monitorConfig.APIKey = apiKey
monitorConfig.QEDEndpoints = qedEndpoints
monitorConfig.QEDEndpoints = qedUrls
monitorConfig.PubUrls = pubUrls

monitor, err := monitor.NewMonitor(monitorConfig)
if err != nil {
Expand All @@ -63,8 +64,10 @@ func newAgentMonitorCommand(ctx *agentContext) *cobra.Command {
},
}

cmd.Flags().StringSliceVarP(&qedEndpoints, "endpoints", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which a monitor can make queries")
cmd.MarkFlagRequired("endpoints")
cmd.Flags().StringSliceVarP(&qedUrls, "qedUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which a monitor can make queries")
cmd.Flags().StringSliceVarP(&pubUrls, "pubUrls", "", []string{}, "Comma-delimited list of QED servers ([host]:port), through which an auditor can make queries")
cmd.MarkFlagRequired("qedUrls")
cmd.MarkFlagRequired("pubUrls")

return cmd
}
2 changes: 1 addition & 1 deletion gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (t *MembershipTask) Do() {
Version: t.s.Snapshot.Version,
EventDigest: t.s.Snapshot.EventDigest,
}
ok := t.qed.Verify(proof, checkSnap, hashing.NewSha256Hasher)
ok := t.qed.DigestVerify(proof, checkSnap, hashing.NewSha256Hasher)
if !ok {
t.sendAlert(fmt.Sprintf("Unable to verify snapshot %v", t.s.Snapshot))
log.Infof("Unable to verify snapshot %v", t.s.Snapshot)
Expand Down
30 changes: 28 additions & 2 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package monitor

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/bbva/qed/client"
Expand All @@ -27,7 +30,8 @@ import (
)

type Config struct {
QEDEndpoints []string
QedUrls []string
PubUrls []string
APIKey string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
Expand All @@ -51,7 +55,7 @@ type Monitor struct {

func NewMonitor(conf *Config) (*Monitor, error) {

client := client.NewHttpClient(conf.QEDEndpoints[0], conf.APIKey)
client := client.NewHttpClient(conf.QedUrls[0], conf.APIKey)

monitor := &Monitor{
client: client,
Expand Down Expand Up @@ -125,6 +129,22 @@ func (m *Monitor) dispatchTasks() {
}
}

func (m *Monitor) sendAlert(msg string) {

go func() {
resp, err := http.Post(fmt.Sprintf("%s/alert", m.conf.PubUrls), "application/json",
bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore: %v", err)
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Infof("Error getting response from alertStore saving a batch: %v", err)
}
}()
}

func (m *Monitor) executeTask(task *QueryTask) {
log.Debug("Executing task: %+v", task)
fmt.Printf("Executing task: %+v\n", task)
Expand All @@ -134,5 +154,11 @@ func (m *Monitor) executeTask(task *QueryTask) {
log.Errorf("Error executing incremental query: %v", err)
}
ok := m.client.VerifyIncremental(resp, task.StartSnapshot, task.EndSnapshot, hashing.NewSha256Hasher())
if !ok {
m.sendAlert(fmt.Sprintf("Unable to verify incremental proof from %d to %d",
task.StartSnapshot.Version, task.EndSnapshot.Version))
log.Infof("Unable to verify incremental proof from %d to %d",
task.StartSnapshot.Version, task.EndSnapshot.Version)
}
fmt.Printf("Consistency between versions %d and %d: %v\n", task.Start, task.End, ok)
}
19 changes: 13 additions & 6 deletions tests/e2e/agents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ func TestAgents(t *testing.T) {
bStore, aStore := setupStore(t)
bServer, aServer := setupServer(0, "", t)
bAuditor, aAuditor := setupAuditor(0, t)
// bMonitor, aMonitor := setupMonitor(0, t)
bMonitor, aMonitor := setupMonitor(0, t)
bPublisher, aPublisher := setupPublisher(0, t)

scenario, let := scope.Scope(t,
merge(bStore, bServer, bAuditor /* bMonitor*/, bPublisher),
merge(aAuditor /*,aMonitor*/, aPublisher, aServer, aStore),
merge(bStore, bServer, bAuditor, bMonitor, bPublisher),
merge(aAuditor, aMonitor, aPublisher, aServer, aStore),
)

client := getClient(0)
event := rand.RandomString(10)

scenario("Add one event and check that it has been published", func() {
scenario("Add one event and check that it has been published without alerts", func() {
var snapshot *protocol.Snapshot
var ss *protocol.SignedSnapshot
var err error
Expand All @@ -96,10 +96,17 @@ func TestAgents(t *testing.T) {
let("Check Auditor do not create any alert", func(t *testing.T) {
time.Sleep(1 * time.Second)
alerts, err := getAlert()
fmt.Println(string(alerts))
assert.NoError(t, err)
assert.False(t, strings.Contains(string(alerts), "Unable to verify"), "Must not exist alerts")
assert.False(t, strings.Contains(string(alerts), "Unable to verify snapshot"), "Must not exist alerts")
})

let("Check Monitor do not create any alert", func(t *testing.T) {
time.Sleep(1 * time.Second)
alerts, err := getAlert()
assert.NoError(t, err)
assert.False(t, strings.Contains(string(alerts), "Unable to verify incremental"), "Must not exist alerts")
})

})

}
3 changes: 2 additions & 1 deletion tests/e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func setupMonitor(id int, t *testing.T) (scope.TestF, scope.TestF) {

before := func(t *testing.T) {
monitorConf := monitor.DefaultConfig()
monitorConf.QEDEndpoints = []string{QEDUrl}
monitorConf.QedUrls = []string{QEDUrl}
monitorConf.PubUrls = []string{StoreUrl}
monitorConf.APIKey = APIKey

mn, err = monitor.NewMonitor(monitorConf)
Expand Down
2 changes: 1 addition & 1 deletion tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ done

for i in `seq 1 $2`;
do
xterm -hold -e "$QED agent --alertsUrls $alertsEndpoint monitor -k key -l info --bind 127.0.0.1:920$i --join $masterEndpoint --endpoints $qedEndpoint --node monitor$i" &
xterm -hold -e "$QED agent --alertsUrls $alertsEndpoint monitor -k key -l info --bind 127.0.0.1:920$i --join $masterEndpoint --endpoints $qedEndpoint --pubUrls $publisherEndpoint --node monitor$i" &
pids+=($!)
done

Expand Down

0 comments on commit 7be3930

Please sign in to comment.