Skip to content

Commit

Permalink
Include qed incremental requests into monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 28, 2018
1 parent 46ad21b commit b196787
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
52 changes: 29 additions & 23 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package monitor

import (
"fmt"
"time"

"github.com/bbva/qed/client"
Expand Down Expand Up @@ -62,8 +63,8 @@ func NewMonitor(conf *Config) (*Monitor, error) {
}

type QueryTask struct {
Start, End uint64
StartDigest, EndDigest hashing.Digest
Start, End uint64
StartSnapshot, EndSnapshot *protocol.Snapshot
}

func (m Monitor) Process(b *protocol.BatchSnapshots) {
Expand All @@ -74,10 +75,10 @@ func (m Monitor) Process(b *protocol.BatchSnapshots) {
log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version)

task := &QueryTask{
Start: first.Version,
End: last.Version,
StartDigest: first.HistoryDigest,
EndDigest: last.HistoryDigest,
Start: first.Version,
End: last.Version,
StartSnapshot: first,
EndSnapshot: last,
}

m.taskCh <- task
Expand Down Expand Up @@ -105,25 +106,30 @@ func (m *Monitor) Shutdown() {

func (m *Monitor) dispatchTasks() {
count := 0
for i := 0; i < m.conf.MaxInFlightTasks; i++ {
task := <-m.taskCh
go m.executeTask(task)
count++
var task *QueryTask
defer log.Debugf("%d tasks dispatched", count)
for {
select {
case task = <-m.taskCh:
go m.executeTask(task)
count++
default:
return
}
if count >= m.conf.MaxInFlightTasks {
return
}
}
// var task *QueryTask

// for {
// select {
// case task = <-m.taskCh:
// go m.executeTask(task)
// count++
// default:
// return
// }
// }
log.Debugf("%d tasks dispatched")
}

func (m *Monitor) executeTask(task *QueryTask) {
log.Debug("Executing task: %v", task)
log.Debug("Executing task: %+v", task)
fmt.Printf("Executing task: %+v\n", task)
resp, err := m.client.Incremental(task.Start, task.End)
if err != nil {
// retry
log.Errorf("Error executing incremental query: %v", err)
}
ok := m.client.VerifyIncremental(resp, task.StartSnapshot, task.EndSnapshot, hashing.NewSha256Hasher())
fmt.Printf("Consistency between versions %d and %d: %v\n", task.Start, task.End, ok)
}
2 changes: 1 addition & 1 deletion tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# limitations under the License.

master="127.0.0.1:9100"
qed="127.0.0.1:8080"
qed="http://127.0.0.1:8080"
go run $GOPATH/src/github.com/bbva/qed/main.go start -k key -l silent --node-id server0 --gossip-addr $master --raft-addr 127.0.0.1:9000 -y $HOME/.ssh/id_ed25519 &
pids[0]=$!
sleep 1s
Expand Down

0 comments on commit b196787

Please sign in to comment.