Skip to content

Commit

Permalink
refactor: agents to use the same task interface, increase channel siz…
Browse files Browse the repository at this point in the history
…e in main agent
  • Loading branch information
gdiazlo committed Mar 6, 2019
1 parent 8e0c5e5 commit 7159599
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 44 deletions.
4 changes: 2 additions & 2 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
config: conf,
Topology: NewTopology(),
processors: p,
In: make(chan *protocol.BatchSnapshots, 1000),
Out: make(chan *protocol.BatchSnapshots, 1000),
In: make(chan *protocol.BatchSnapshots, 1<<16),
Out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
}

Expand Down
53 changes: 30 additions & 23 deletions gossip/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Monitor struct {
metricsServer *http.Server
prometheusRegistry *prometheus.Registry

taskCh chan QueryTask
taskCh chan Task
quitCh chan bool
executionTicker *time.Ticker
}
Expand All @@ -75,7 +75,7 @@ func NewMonitor(conf Config) (*Monitor, error) {
Insecure: false,
}),
conf: conf,
taskCh: make(chan QueryTask, 100),
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
}

Expand Down Expand Up @@ -103,9 +103,8 @@ func NewMonitor(conf Config) (*Monitor, error) {
return &monitor, nil
}

type QueryTask struct {
Start, End uint64
StartSnapshot, EndSnapshot protocol.Snapshot
type Task interface {
Do()
}

func (m Monitor) Process(b protocol.BatchSnapshots) {
Expand All @@ -120,6 +119,8 @@ func (m Monitor) Process(b protocol.BatchSnapshots) {
log.Debugf("Processing batch from versions %d to %d", first.Version, last.Version)

task := QueryTask{
client: m.client,
pubUrl: m.conf.PubUrls[0],
Start: first.Version,
End: last.Version,
StartSnapshot: *first,
Expand Down Expand Up @@ -147,7 +148,8 @@ func (m *Monitor) Shutdown() {
metrics.QedMonitorInstancesCount.Dec()

log.Debugf("Metrics enabled: stopping server...")
if err := m.metricsServer.Shutdown(context.Background()); err != nil { // TODO include timeout instead nil
// TODO include timeout instead nil
if err := m.metricsServer.Shutdown(context.Background()); err != nil {
log.Error(err)
}
log.Debugf("Done.\n")
Expand All @@ -160,7 +162,7 @@ func (m *Monitor) Shutdown() {

func (m Monitor) dispatchTasks() {
count := 0
var task QueryTask
var task Task
var ok bool
defer log.Debugf("%d tasks dispatched", count)
for {
Expand All @@ -169,7 +171,7 @@ func (m Monitor) dispatchTasks() {
if !ok {
return
}
go m.executeTask(task)
go task.Do()
count++
default:
return
Expand All @@ -180,11 +182,19 @@ func (m Monitor) dispatchTasks() {
}
}

func (m Monitor) sendAlert(msg string) {
resp, err := http.Post(fmt.Sprintf("%s/alert", m.conf.PubUrls[0]), "application/json",
bytes.NewBufferString(msg))
type QueryTask struct {
client *client.HTTPClient
pubUrl string
taskCh chan Task
Start, End uint64
StartSnapshot, EndSnapshot protocol.Snapshot
}

func (q QueryTask) sendAlert(msg string) {
resp, err := http.Post(fmt.Sprintf("%s/alert", q.pubUrl), "application/json", bytes.NewBufferString(msg))
if err != nil {
log.Infof("Error saving batch in alertStore: %v", err)
log.Infof("Error saving batch in alertStore (task re-enqueued): %v", err)
q.taskCh <- q
return
}
defer resp.Body.Close()
Expand All @@ -194,21 +204,18 @@ func (m Monitor) sendAlert(msg string) {
}
}

func (m Monitor) executeTask(task QueryTask) {
log.Debug("Executing task: %+v", task)
resp, err := m.client.Incremental(task.Start, task.End)
func (q QueryTask) Do() {
log.Debug("Executing task: %+v", q)
resp, err := q.client.Incremental(q.Start, q.End)
if err != nil {
// TODO: retry
metrics.QedMonitorGetIncrementalProofErrTotal.Inc()
log.Infof("Unable to get incremental proof from QED server: %s", err.Error())
log.Infof("Unable to verify incremental proof from %d to %d", q.Start, q.End)
return
}
ok := m.client.VerifyIncremental(resp, &task.StartSnapshot, &task.EndSnapshot, hashing.NewSha256Hasher())
ok := q.client.VerifyIncremental(resp, &q.StartSnapshot, &q.EndSnapshot, hashing.NewSha256Hasher())
if !ok {
m.sendAlert(fmt.Sprintf("Unable to verify incremental proof from %d to %d",
task.StartSnapshot.Version, task.EndSnapshot.Version))
log.Infof("Unable to verify incremental proof from %d to %d",
task.StartSnapshot.Version, task.EndSnapshot.Version)
q.sendAlert(fmt.Sprintf("Unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version))
log.Infof("Unable to verify incremental proof from %d to %d", q.StartSnapshot.Version, q.EndSnapshot.Version)
}
log.Debugf("Consistency between versions %d and %d: %v\n", task.Start, task.End, ok)
log.Debugf("Consistency between versions %d and %d: %v\n", q.Start, q.End, ok)
}
46 changes: 27 additions & 19 deletions gossip/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package publisher
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/bbva/qed/gossip/metrics"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/valyala/fasthttp"

"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -56,23 +54,23 @@ func NewConfig(PubUrls []string) *Config {
}

type Publisher struct {
client *fasthttp.Client
conf Config
store *http.Client
conf Config

metricsServer *http.Server
prometheusRegistry *prometheus.Registry

taskCh chan PublishTask
taskCh chan Task
quitCh chan bool
executionTicker *time.Ticker
}

func NewPublisher(conf Config) (*Publisher, error) {
metrics.QedPublisherInstancesCount.Inc()
publisher := Publisher{
client: &fasthttp.Client{},
store: &http.Client{},
conf: conf,
taskCh: make(chan PublishTask, 100),
taskCh: make(chan Task, 100),
quitCh: make(chan bool),
}

Expand Down Expand Up @@ -100,18 +98,17 @@ func NewPublisher(conf Config) (*Publisher, error) {
return &publisher, nil
}

type PublishTask struct {
Batch protocol.BatchSnapshots
}

func (p *Publisher) Process(b protocol.BatchSnapshots) {
// Metrics
metrics.QedPublisherBatchesReceivedTotal.Inc()
timer := prometheus.NewTimer(metrics.QedPublisherBatchesProcessSeconds)
defer timer.ObserveDuration()

task := &PublishTask{
Batch: b,
store: p.store,
pubUrl: p.conf.PubUrls[0],
taskCh: p.taskCh,
batch: b,
}
p.taskCh <- *task
}
Expand Down Expand Up @@ -147,12 +144,12 @@ func (p *Publisher) Shutdown() {

func (p Publisher) dispatchTasks() {
count := 0
var task PublishTask
var task Task
defer log.Debugf("%d tasks dispatched", count)
for {
select {
case task = <-p.taskCh:
go p.executeTask(task)
go task.Do()
count++
default:
return
Expand All @@ -163,17 +160,28 @@ func (p Publisher) dispatchTasks() {
}
}

func (p Publisher) executeTask(task PublishTask) {
log.Debugf("Executing task: %+v", task)
buf, err := task.Batch.Encode()
type Task interface {
Do()
}

type PublishTask struct {
store *http.Client
pubUrl string
batch protocol.BatchSnapshots
taskCh chan Task
}

func (t PublishTask) Do() {
log.Debugf("Executing task: %+v", t)
buf, err := t.batch.Encode()
if err != nil {
log.Debug("Publisher: Error marshalling: %s\n", err.Error())
return
}
resp, err := http.Post(fmt.Sprintf("%s/batch", p.conf.PubUrls[0]),
"application/json", bytes.NewBuffer(buf))
resp, err := t.store.Post(t.pubUrl+"/batch", "application/json", bytes.NewBuffer(buf))
if err != nil {
log.Infof("Error saving batch in snapStore: %v\n", err)
t.taskCh <- t
return
}
defer resp.Body.Close()
Expand Down
73 changes: 73 additions & 0 deletions gossip/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gossip

import (
"runtime"
"sync/atomic"

"github.com/bbva/qed/protocol"
)

const queueSize uint64 = 65535

// Masking is faster than division
const indexMask uint64 = queueSize - 1

type Queue struct {
// The padding members 1 to 5 below are here to ensure each item is on a separate cache line.
// This prevents false sharing and hence improves performance.
padding1 [8]uint64
lastCommittedIndex uint64
padding2 [8]uint64
nextFreeIndex uint64
padding3 [8]uint64
readerIndex uint64
padding4 [8]uint64
contents [queueSize]*protocol.BatchSnapshots
padding5 [8]uint64
}

func NewQueue() *Queue {
return &Queue{
lastCommittedIndex: 0,
nextFreeIndex: 1,
readerIndex: 1,
}
}

func (self *Queue) Write(value *protocol.BatchSnapshots) {
var myIndex = atomic.AddUint64(&self.nextFreeIndex, 1) - 1
//Wait for reader to catch up, so we don't clobber a slot which it is (or will be) reading
for myIndex > (self.readerIndex + queueSize - 2) {
runtime.Gosched()
}
//Write the item into it's slot
self.contents[myIndex&indexMask] = value
//Increment the lastCommittedIndex so the item is available for reading
for !atomic.CompareAndSwapUint64(&self.lastCommittedIndex, myIndex-1, myIndex) {
runtime.Gosched()
}
}

func (self *Queue) Read() *protocol.BatchSnapshots {
var myIndex = atomic.AddUint64(&self.readerIndex, 1) - 1
//If reader has out-run writer, wait for a value to be committed
for myIndex > self.lastCommittedIndex {
runtime.Gosched()
}
return self.contents[myIndex&indexMask]
}

0 comments on commit 7159599

Please sign in to comment.