Skip to content

Commit

Permalink
Add monitor skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Nov 23, 2018
1 parent f425b8a commit 1fc45f1
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 15 deletions.
8 changes: 5 additions & 3 deletions cmd/agent_auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cmd

import (
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
"github.com/spf13/cobra"
Expand All @@ -34,15 +35,16 @@ func newAgentAuditorCommand(ctx *agentContext) *cobra.Command {

log.SetLogger("QedAuditor", logLevel)

config := ctx.config
agentConfig := ctx.config
agentConfig.Role = member.Auditor
//auditorConfig := auditor.DefaultConfig()

agent, err := gossip.NewAgent(config, []gossip.Processor{gossip.DummyProcessor{}})
agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{gossip.DummyProcessor{}})
if err != nil {
log.Fatalf("Failed to start the QED auditor: %v", err)
}

contacted, err := agent.Join(config.StartJoin)
contacted, err := agent.Join(agentConfig.StartJoin)
if err != nil {
log.Fatalf("Failed to join the cluster: %v", err)
}
Expand Down
18 changes: 14 additions & 4 deletions cmd/agent_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package cmd

import (
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/monitor"
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
"github.com/spf13/cobra"
Expand All @@ -34,15 +36,23 @@ func newAgentMonitorCommand(ctx *agentContext) *cobra.Command {

log.SetLogger("QedMonitor", logLevel)

config := ctx.config
//monitorConfig := monitor.DefaultConfig()
agentConfig := ctx.config
agentConfig.Role = member.Monitor
monitorConfig := monitor.DefaultConfig()
monitorConfig.APIKey = apiKey
monitorConfig.QEDEndpoints = qedEndpoints

agent, err := gossip.NewAgent(config, []gossip.Processor{gossip.DummyProcessor{}})
monitor, err := monitor.NewMonitor(monitorConfig)
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

contacted, err := agent.Join(config.StartJoin)
agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{monitor})
if err != nil {
log.Fatalf("Failed to start the QED monitor: %v", err)
}

contacted, err := agent.Join(agentConfig.StartJoin)
if err != nil {
log.Fatalf("Failed to join the cluster: %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions cmd/agent_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cmd

import (
"github.com/bbva/qed/gossip"
"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/gossip/publisher"
"github.com/bbva/qed/log"
"github.com/bbva/qed/util"
Expand All @@ -36,15 +37,16 @@ func newAgentPublisherCommand(ctx *agentContext) *cobra.Command {

log.SetLogger("QedPublisher", logLevel)

config := ctx.config
agentConfig := ctx.config
agentConfig.Role = member.Publisher
publisherConfig := publisher.NewConfig(&fasthttp.Client{}, endpoints)

agent, err := gossip.NewAgent(config, []gossip.Processor{publisher.NewPublisher(publisherConfig)})
agent, err := gossip.NewAgent(agentConfig, []gossip.Processor{publisher.NewPublisher(publisherConfig)})
if err != nil {
log.Fatalf("Failed to start the QED publisher: %v", err)
}

contacted, err := agent.Join(config.StartJoin)
contacted, err := agent.Join(agentConfig.StartJoin)
if err != nil {
log.Fatalf("Failed to join the cluster: %v", err)
}
Expand Down
107 changes: 104 additions & 3 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,117 @@

package monitor

import (
"time"

"github.com/bbva/qed/client"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
)

type Config struct {
QEDEndpoints []string
APIKey string
TaskExecutionInterval time.Duration
MaxInFlightTasks int
}

func DefaultConfig() *Config {
return nil
return &Config{
TaskExecutionInterval: 200 * time.Millisecond,
MaxInFlightTasks: 10,
}
}

type Monitor struct {
client *client.HttpClient
conf *Config

taskCh chan *QueryTask
quitCh chan bool
executionTicker *time.Timer
}

func NewMonitor(conf *Config) (*Monitor, error) {

client := client.NewHttpClient(conf.QEDEndpoints[0], conf.APIKey)

monitor := &Monitor{
client: client,
conf: conf,
taskCh: make(chan *QueryTask, 100),
quitCh: make(chan bool),
}

go monitor.runTaskDispatcher()

return monitor, nil
}

type QueryTask struct {
Start, End uint64
StartDigest, EndDigest hashing.Digest
}

func (m Monitor) Process(b *protocol.BatchSnapshots) {

first := b.Snapshots[0].Snapshot
last := b.Snapshots[len(b.Snapshots)-1].Snapshot

log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version)

task := &QueryTask{
Start: first.Version,
End: last.Version,
StartDigest: first.HistoryDigest,
EndDigest: last.HistoryDigest,
}

m.taskCh <- task
}

func (m *Monitor) runTaskDispatcher() {
m.executionTicker = time.NewTimer(m.conf.TaskExecutionInterval)
for {
select {
case <-m.executionTicker.C:
log.Debug("Dispatching tasks...")
m.dispatchTasks()
case <-m.quitCh:
return
}
}
}

func (m *Monitor) Shutdown() {
m.executionTicker.Stop()
m.quitCh <- true
close(m.quitCh)
close(m.taskCh)
}

func (m *Monitor) dispatchTasks() {
count := 0
for i := 0; i < m.conf.MaxInFlightTasks; i++ {
task := <-m.taskCh
go m.executeTask(task)
count++
}
// var task *QueryTask

// for {
// select {
// case task = <-m.taskCh:
// go m.executeTask(task)
// count++
// default:
// return
// }
// }
log.Debugf("%d tasks dispatched")
}

func NewMonitor(conf *Config) *Monitor {
return &Monitor{}
func (m *Monitor) executeTask(task *QueryTask) {
log.Debug("Executing task: %v", task)
}
5 changes: 3 additions & 2 deletions tests/gossip/run_gossip.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@
# limitations under the License.

master="127.0.0.1:9100"
qed="127.0.0.1:8080"
go run $GOPATH/src/github.com/bbva/qed/main.go start -k key -l debug --node-id server0 --gossip-addr $master --raft-addr 127.0.0.1:9000 -y $HOME/.ssh/id_ed25519 &
pids[0]=$!
sleep 1s

for i in `seq 1 $1`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent auditor -k key -l debug --bind 127.0.0.1:910$i --join $master --endpoints $master --node auditor$i" &
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent auditor -k key -l debug --bind 127.0.0.1:910$i --join $master --endpoints $qed --node auditor$i" &
pids+=($!)
done

for i in `seq 1 $2`;
do
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent monitor -k key -l debug --bind 127.0.0.1:920$i --join $master --endpoints $master --node monitor$i" &
xterm -hold -e "go run $GOPATH/src/github.com/bbva/qed/main.go agent monitor -k key -l debug --bind 127.0.0.1:920$i --join $master --endpoints $qed --node monitor$i" &
pids+=($!)
done

Expand Down

0 comments on commit 1fc45f1

Please sign in to comment.