Skip to content

Commit

Permalink
Add deduplication of batches in gossip agents, to avoid excessive gos…
Browse files Browse the repository at this point in the history
…siping.
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent 8677dbf commit ed8434f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
35 changes: 26 additions & 9 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ import (
"time"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/coocood/freecache"
"github.com/hashicorp/memberlist"
)

type hashedBatch struct {
batch *protocol.BatchSnapshots
digest hashing.Digest
}

type Agent struct {
config *Config
Self *member.Peer
Expand All @@ -38,9 +45,10 @@ type Agent struct {

stateLock sync.Mutex

processed *freecache.Cache
processors []Processor

In chan *protocol.BatchSnapshots
In chan *hashedBatch
Out chan *protocol.BatchSnapshots
quit chan bool
}
Expand All @@ -51,7 +59,8 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
config: conf,
Topology: NewTopology(),
processors: p,
In: make(chan *protocol.BatchSnapshots, 1<<16),
processed: freecache.NewCache(1 << 20),
In: make(chan *hashedBatch, 1<<16),
Out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
}
Expand Down Expand Up @@ -106,10 +115,12 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {

return agent, nil
}
func chTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.BatchSnapshots) {

// Send a batch into a queue channel with the agent TimeoutQueues timeout.
func (a *Agent) ChTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.BatchSnapshots) {
for {
select {
case <-time.After(200 * time.Millisecond):
case <-time.After(a.config.TimeoutQueues):
log.Infof("Agent timed out enqueueing batch in out channel")
return
case ch <- batch:
Expand All @@ -119,15 +130,21 @@ func chTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.BatchSnapshot
}

func (a *Agent) start() {
outTicker := time.NewTicker(2 * time.Second)

for {
select {
case batch := <-a.In:
case hashedBatch := <-a.In:
_, err := a.processed.Get(hashedBatch.digest)
if err == nil {
continue
}
a.processed.Set(hashedBatch.digest, []byte{0x0}, 0)

for _, p := range a.processors {
go p.Process(*batch)
go p.Process(hashedBatch.batch)
}
chTimedSend(batch, a.Out)
case <-outTicker.C:
a.ChTimedSend(hashedBatch.batch, a.Out)
case <-time.After(a.config.ProcessInterval):
go a.sendOutQueue()
case <-a.quit:
return
Expand Down
17 changes: 15 additions & 2 deletions gossip/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package gossip

import (
"encoding/json"

"github.com/bbva/qed/gossip/member"
"github.com/bbva/qed/hashing"
"github.com/bbva/qed/log"
"github.com/bbva/qed/protocol"
"github.com/hashicorp/memberlist"
Expand Down Expand Up @@ -81,14 +84,24 @@ func (d *agentDelegate) NodeMeta(limit int) []byte {
// slice may be modified after the call returns, so it should be copied if needed
func (d *agentDelegate) NotifyMsg(msg []byte) {
var batch protocol.BatchSnapshots
err := batch.Decode(msg)

var tmp map[string]*json.RawMessage
err := json.Unmarshal(msg, &tmp)
if err != nil {
log.Errorf("Unable to decode message: %v", err)
return
}

err = batch.Decode(msg)
if err != nil {
log.Errorf("Unable to decode message: %v", err)
return
}
log.Debugf("Notifying batch %+v\n", batchId(&batch))
d.agent.In <- &batch

// hashs the snaapshots to deduplicate processing inside the agent
hash := hashing.NewSha256Hasher().Do(*tmp["Snapshots"])
d.agent.In <- &hashedBatch{&batch, hash}
}

// GetBroadcasts is called when user data messages can be broadcast.
Expand Down

0 comments on commit ed8434f

Please sign in to comment.