Skip to content

Commit

Permalink
Add metric server and remove queue code, refactor sendQueue to send a…
Browse files Browse the repository at this point in the history
… single batch
  • Loading branch information
gdiazlo committed Mar 19, 2019
1 parent a59d02e commit cdb526f
Showing 1 changed file with 34 additions and 34 deletions.
68 changes: 34 additions & 34 deletions gossip/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Agent struct {
config *Config
Self *member.Peer

metricsServer *metricsServer

memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue

Expand All @@ -56,13 +58,14 @@ type Agent struct {
func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
log.Infof("New agent %s\n", conf.NodeName)
agent = &Agent{
config: conf,
Topology: NewTopology(),
processors: p,
processed: freecache.NewCache(1 << 20),
In: make(chan *hashedBatch, 1<<16),
Out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
config: conf,
metricsServer: newMetricsServer(conf.MetricsAddr),
Topology: NewTopology(),
processors: p,
processed: freecache.NewCache(1 << 20),
In: make(chan *hashedBatch, 1<<16),
Out: make(chan *protocol.BatchSnapshots, 1<<16),
quit: make(chan bool),
}

bindIP, bindPort, err := conf.AddrParts(conf.BindAddr)
Expand All @@ -86,7 +89,6 @@ func NewAgent(conf *Config, p []Processor) (agent *Agent, err error) {
conf.MemberlistConfig.AdvertisePort = advertisePort
conf.MemberlistConfig.Name = conf.NodeName
conf.MemberlistConfig.Logger = log.GetLogger()

// Configure delegates
conf.MemberlistConfig.Delegate = newAgentDelegate(agent)
conf.MemberlistConfig.Events = &eventDelegate{agent}
Expand Down Expand Up @@ -131,6 +133,14 @@ func (a *Agent) ChTimedSend(batch *protocol.BatchSnapshots, ch chan *protocol.Ba

func (a *Agent) start() {

for _, p := range a.processors {
p.RegisterMetrics(a.metricsServer.registry)
}

go func() {
a.metricsServer.start()
}()

for {
select {
case hashedBatch := <-a.In:
Expand All @@ -144,39 +154,27 @@ func (a *Agent) start() {
go p.Process(hashedBatch.batch)
}
a.ChTimedSend(hashedBatch.batch, a.Out)
case <-time.After(a.config.ProcessInterval):
go a.sendOutQueue()
case b := <-a.Out:
go a.send(b)
case <-a.quit:
return
}
}
}

func batchId(b *protocol.BatchSnapshots) string {
return fmt.Sprintf("( ttl %d, lv %d)", b.TTL, b.Snapshots[len(b.Snapshots)-1].Snapshot.Version)
}
func (a *Agent) send(batch *protocol.BatchSnapshots) {

func (a *Agent) sendOutQueue() {
var batch *protocol.BatchSnapshots
for {
select {
case batch = <-a.Out:
default:
return
}

if batch.TTL <= 0 {
continue
}
if batch.TTL <= 0 {
return
}

batch.TTL -= 1
from := batch.From
batch.From = a.Self
msg, _ := batch.Encode()
for _, dst := range a.route(from) {
log.Debugf("Sending %+v to %+v\n", batchId(batch), dst.Name)
a.memberlist.SendReliable(dst, msg)
}
batch.TTL -= 1
from := batch.From
batch.From = a.Self
msg, _ := batch.Encode()
for _, dst := range a.route(from) {
log.Debugf("Sending batch to %+v\n", dst.Name)
a.memberlist.SendReliable(dst, msg)
}
}

Expand Down Expand Up @@ -259,10 +257,12 @@ func (a *Agent) Leave() error {
//
// It is safe to call this method multiple times.
func (a *Agent) Shutdown() error {
log.Info("\nShutting down agent %s", a.config.NodeName)
log.Infof("Shutting down agent %s", a.config.NodeName)
a.stateLock.Lock()
defer a.stateLock.Unlock()

a.metricsServer.shutdown()

if a.Self.Status == member.Shutdown {
return nil
}
Expand Down

0 comments on commit cdb526f

Please sign in to comment.