Skip to content

Commit

Permalink
Refactor publisher, monitor and auditor reference/value params
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas authored and iknite committed Dec 18, 2018
1 parent c288741 commit fdce2bb
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 55 deletions.
71 changes: 36 additions & 35 deletions gossip/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auditor
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
Expand Down Expand Up @@ -54,16 +55,48 @@ type Auditor struct {
}

func NewAuditor(conf *Config) (*Auditor, error) {
auditor := &Auditor{
auditor := Auditor{
qed: client.NewHttpClient(conf.QEDUrls[0], conf.APIKey),
conf: conf,
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
}

auditor.executionTicker = time.NewTicker(conf.TaskExecutionInterval)
go auditor.runTaskDispatcher()

return auditor, nil
return &auditor, nil
}

func (a Auditor) runTaskDispatcher() {
for {
select {
case <-a.executionTicker.C:
log.Debug("Dispatching tasks...")
go a.dispatchTasks()
case <-a.quitCh:
a.executionTicker.Stop()
return
}
}
}

func (a Auditor) dispatchTasks() {
count := 0
var task Task
defer log.Debugf("%d tasks dispatched", count)
for {
select {
case task = <-a.taskCh:
go task.Do()
count++
default:
return
}
if count >= a.conf.MaxInFlightTasks {
return
}
}
}

type Task interface {
Expand Down Expand Up @@ -133,14 +166,13 @@ func (t MembershipTask) sendAlert(msg string) {
log.Infof("Error saving batch in alertStore: %v", err)
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
_, err = io.Copy(ioutil.Discard, resp.Body)
if err != nil {
log.Infof("Error reading request body: %v", err)
}
}

func (a Auditor) Process(b *protocol.BatchSnapshots) {

task := &MembershipTask{
qed: a.qed,
pubUrl: a.conf.PubUrls[0],
Expand All @@ -151,40 +183,9 @@ func (a Auditor) Process(b *protocol.BatchSnapshots) {
a.taskCh <- task
}

func (a *Auditor) runTaskDispatcher() {
a.executionTicker = time.NewTicker(a.conf.TaskExecutionInterval)
for {
select {
case <-a.executionTicker.C:
log.Debug("Dispatching tasks...")
a.dispatchTasks()
case <-a.quitCh:
return
}
}
}

func (a *Auditor) Shutdown() {
a.executionTicker.Stop()
a.quitCh <- true
close(a.quitCh)
close(a.taskCh)
}

func (a *Auditor) dispatchTasks() {
count := 0
var task Task
defer log.Debugf("%d tasks dispatched", count)
for {
select {
case task = <-a.taskCh:
go task.Do()
count++
default:
return
}
if count >= a.conf.MaxInFlightTasks {
return
}
}
}
23 changes: 11 additions & 12 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package monitor
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
Expand Down Expand Up @@ -54,19 +55,18 @@ type Monitor struct {
}

func NewMonitor(conf *Config) (*Monitor, error) {

client := client.NewHttpClient(conf.QedUrls[0], conf.APIKey)

monitor := &Monitor{
monitor := Monitor{
client: client,
conf: conf,
taskCh: make(chan QueryTask, 100),
quitCh: make(chan bool),
}

monitor.executionTicker = time.NewTicker(conf.TaskExecutionInterval)
go monitor.runTaskDispatcher()

return monitor, nil
return &monitor, nil
}

type QueryTask struct {
Expand All @@ -75,7 +75,6 @@ type QueryTask struct {
}

func (m Monitor) Process(b *protocol.BatchSnapshots) {

first := b.Snapshots[0].Snapshot
last := b.Snapshots[len(b.Snapshots)-1].Snapshot

Expand All @@ -91,14 +90,14 @@ func (m Monitor) Process(b *protocol.BatchSnapshots) {
m.taskCh <- task
}

func (m *Monitor) runTaskDispatcher() {
m.executionTicker = time.NewTicker(m.conf.TaskExecutionInterval)
func (m Monitor) runTaskDispatcher() {
for {
select {
case <-m.executionTicker.C:
log.Debug("Dispatching tasks...")
m.dispatchTasks()
go m.dispatchTasks()
case <-m.quitCh:
m.executionTicker.Stop()
return
}
}
Expand All @@ -111,7 +110,7 @@ func (m *Monitor) Shutdown() {
close(m.taskCh)
}

func (m *Monitor) dispatchTasks() {
func (m Monitor) dispatchTasks() {
count := 0
var task QueryTask
defer log.Debugf("%d tasks dispatched", count)
Expand All @@ -129,20 +128,20 @@ func (m *Monitor) dispatchTasks() {
}
}

func (m *Monitor) sendAlert(msg string) {
func (m Monitor) sendAlert(msg string) {
resp, err := http.Post(fmt.Sprintf("%s/alert", m.conf.PubUrls[0]), "application/json",
bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore: %v", err)
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
_, err = io.Copy(ioutil.Discard, resp.Body)
if err != nil {
log.Infof("Error getting response from alertStore saving a batch: %v", err)
}
}

func (m *Monitor) executeTask(task QueryTask) {
func (m Monitor) executeTask(task QueryTask) {
log.Debug("Executing task: %+v", task)
resp, err := m.client.Incremental(task.Start, task.End)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,17 @@ type Publisher struct {

func NewPublisher(conf *Config) (*Publisher, error) {

publisher := &Publisher{
publisher := Publisher{
client: &fasthttp.Client{},
conf: conf,
taskCh: make(chan PublishTask, 100),
quitCh: make(chan bool),
}

publisher.executionTicker = time.NewTicker(conf.TaskExecutionInterval)
go publisher.runTaskDispatcher()

return publisher, nil
return &publisher, nil
}

type PublishTask struct {
Expand All @@ -82,14 +83,14 @@ func (p *Publisher) Process(b *protocol.BatchSnapshots) {
p.taskCh <- *task
}

func (p *Publisher) runTaskDispatcher() {
p.executionTicker = time.NewTicker(p.conf.TaskExecutionInterval)
func (p Publisher) runTaskDispatcher() {
for {
select {
case <-p.executionTicker.C:
log.Debug("Dispatching tasks...")
p.dispatchTasks()
go p.dispatchTasks()
case <-p.quitCh:
p.executionTicker.Stop()
return
}
}
Expand All @@ -102,7 +103,7 @@ func (p *Publisher) Shutdown() {
close(p.taskCh)
}

func (p *Publisher) dispatchTasks() {
func (p Publisher) dispatchTasks() {
count := 0
var task PublishTask
defer log.Debugf("%d tasks dispatched", count)
Expand All @@ -120,7 +121,7 @@ func (p *Publisher) dispatchTasks() {
}
}

func (p *Publisher) executeTask(task PublishTask) {
func (p Publisher) executeTask(task PublishTask) {
log.Debug("Executing task: %+v\n", task)
buf, err := task.Batch.Encode()
if err != nil {
Expand All @@ -137,5 +138,4 @@ func (p *Publisher) executeTask(task PublishTask) {
if err != nil {
log.Infof("Error getting response from snapStore saving a batch: %v", err)
}

}

0 comments on commit fdce2bb

Please sign in to comment.