Skip to content

Commit

Permalink
Separate broker eventloop
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jul 13, 2017
1 parent 8b994f2 commit c407f56
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 146 deletions.
168 changes: 28 additions & 140 deletions libbeat/publisher/broker/membroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/broker"
)

Expand All @@ -28,8 +29,6 @@ type Broker struct {
acks chan int
scheduledACKs chan chanList

ackSeq uint

eventer broker.Eventer

// wait group for worker shutdown
Expand Down Expand Up @@ -124,12 +123,13 @@ func NewBroker(
b.minEvents = minEvents
b.idleTimeout = flushTimeout

eventLoop := newEventLoop(b)
ack := &ackLoop{broker: b}

b.wg.Add(2)
go func() {
defer b.wg.Done()
b.eventLoop()
eventLoop.run()
}()
go func() {
defer b.wg.Done()
Expand Down Expand Up @@ -161,143 +161,7 @@ func (b *Broker) Consumer() broker.Consumer {
return newConsumer(b)
}

func (b *Broker) eventLoop() {
var (
timer *time.Timer
idleC <-chan time.Time
forceFlush bool

events = b.events
get chan getRequest

activeEvents int

totalGet uint64
totalACK uint64
batchesGen uint64

// log = b.logger

// Buffer and send pending batches to ackloop.
pendingACKs chanList
schedACKS chan chanList
)

if b.idleTimeout > 0 {
// create initialy 'stopped' timer -> reset will be used
// on timer object, if flush timer becomes active.
timer = time.NewTimer(b.idleTimeout)
if !timer.Stop() {
<-timer.C
}
}

for {
select {
case <-b.done:
return

// receiving new events into the buffer
case req := <-events:
// log.Debugf("push event: %v\t%v\t%p\n", req.event, req.seq, req.state)

avail, ok := b.insert(req)
if !ok {
break
}
if avail == 0 {
// log.Debugf("buffer: all regions full")
events = nil
}

case req := <-b.pubCancel:
// log.Debug("handle cancel request")
var removed int
if st := req.state; st != nil {
st.cancelled = true
removed = b.buf.cancel(st)
}

// signal cancel request being finished
if req.resp != nil {
req.resp <- producerCancelResponse{
removed: removed,
}
}

// re-enable pushRequest if buffer can take new events
if !b.buf.Full() {
events = b.events
}

case <-idleC:
forceFlush = true
idleC = nil

case req := <-get:
start, buf := b.buf.reserve(req.sz)
count := len(buf)
if count == 0 {
panic("empty batch returned")
}

// log.Debug("newACKChan: ", b.ackSeq, count)
ackCH := newACKChan(b.ackSeq, start, count)
b.ackSeq++

activeEvents += ackCH.count
totalGet += uint64(ackCH.count)
batchesGen++
// log.Debug("broker: total events get = ", totalGet)
// log.Debug("broker: total batches generated = ", batchesGen)

req.resp <- getResponse{buf, ackCH}
pendingACKs.append(ackCH)
schedACKS = b.scheduledACKs

// stop flush timer on get
forceFlush = false
if idleC != nil {
idleC = nil
if !timer.Stop() {
<-timer.C
}
}

case schedACKS <- pendingACKs:
schedACKS = nil
pendingACKs = chanList{}

case count := <-b.acks:
// log.Debug("receive buffer ack:", count)

activeEvents -= count
totalACK += uint64(count)
// log.Debug("broker: total events ack = ", totalACK)

b.buf.ack(count)
// after ACK some buffer has been freed up, reenable publisher
events = b.events
}

// update get and idle timer after state machine

get = b.requests
if !forceFlush {
avail := b.buf.Avail()
if avail == 0 || b.buf.TotalAvail() < b.minEvents {
get = nil

if avail > 0 && idleC == nil && timer != nil {
timer.Reset(b.idleTimeout)
idleC = timer.C
}
}
}
}
}

func (b *Broker) insert(req pushRequest) (int, bool) {
func (b *Broker) insert(req *pushRequest) (int, bool) {
var avail int
if req.state == nil {
_, avail = b.buf.insert(req.event, clientState{})
Expand All @@ -324,6 +188,30 @@ func (b *Broker) insert(req pushRequest) (int, bool) {
return avail, true
}

func (b *Broker) get(max int) (startIndex int, events []publisher.Event) {
return b.buf.reserve(max)
}

func (b *Broker) cancel(st *produceState) int {
return b.buf.cancel(st)
}

func (b *Broker) full() bool {
return b.buf.Full()
}

func (b *Broker) avail() int {
return b.buf.Avail()
}

func (b *Broker) totalAvail() int {
return b.buf.TotalAvail()
}

func (b *Broker) cleanACKs(count int) {
b.buf.ack(count)
}

func (b *Broker) reportACK(states []clientState, start, N int) {
{
start := time.Now()
Expand Down
183 changes: 183 additions & 0 deletions libbeat/publisher/broker/membroker/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package membroker

import (
"time"
)

type eventLoop struct {
broker *Broker

// active broker API channels
events chan pushRequest
get chan getRequest
pubCancel chan producerCancelRequest

// ack handling
acks chan int // ackloop -> eventloop : total number of events ACKed by outputs
schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked
pendingACKs chanList // ordered list of active batches to be send to the ackloop
ackSeq uint // ack batch sequence number to validate ordering

// buffer flush timer state
timer *time.Timer
idleC <-chan time.Time
forceFlush bool
}

func newEventLoop(b *Broker) *eventLoop {
l := &eventLoop{
broker: b,
events: b.events,
pubCancel: b.pubCancel,
acks: b.acks,
}

if to := b.idleTimeout; to > 0 {
// create initialy 'stopped' timer -> reset will be used
// on timer object, if flush timer becomes active.
l.timer = time.NewTimer(to)
if !l.timer.Stop() {
<-l.timer.C
}
}

return l
}

func (l *eventLoop) run() {
broker := l.broker
for {
select {
case <-broker.done:
return

case req := <-l.events: // producer pushing new event
l.handleInsert(&req)

case req := <-l.pubCancel: // producer cancellig active events
l.handleCancel(&req)

case req := <-l.get: // consumer asking for next batch
l.handleConsumer(&req)

case <-l.idleC:
// handle flush timer being triggered -> pending events can be forwarded via 'get'
l.enableFlushEvents()

case l.schedACKS <- l.pendingACKs:
// on send complete list of pending batches has been forwarded -> clear list and queue
l.schedACKS = nil
l.pendingACKs = chanList{}

case count := <-l.acks:
l.handleACK(count)

}

// update get and idle timer after state machine
l.get = broker.requests
if !l.forceFlush {
avail := broker.avail()
if avail == 0 || broker.totalAvail() < broker.minEvents {
l.get = nil

if avail > 0 {
l.startFlushTimer()
}
}
}
}
}

func (l *eventLoop) handleInsert(req *pushRequest) {
// log := l.broker.logger
// log.Debugf("push event: %v\t%v\t%p\n", req.event, req.seq, req.state)

if avail, ok := l.broker.insert(req); ok && avail == 0 {
// log.Debugf("buffer: all regions full")

// no more space to accept new events -> unset events queue for time being
l.events = nil
}
}

func (l *eventLoop) handleCancel(req *producerCancelRequest) {
// log := l.broker.logger
// log.Debug("handle cancel request")

var (
removed int
broker = l.broker
)

if st := req.state; st != nil {
st.cancelled = true
removed = broker.cancel(st)
}

// signal cancel request being finished
if req.resp != nil {
req.resp <- producerCancelResponse{
removed: removed,
}
}

// re-enable pushRequest if buffer can take new events
if !broker.full() {
l.events = broker.events
}
}

func (l *eventLoop) handleConsumer(req *getRequest) {
// log := l.broker.logger

start, buf := l.broker.get(req.sz)
count := len(buf)
if count == 0 {
panic("empty batch returned")
}

// log.Debug("newACKChan: ", b.ackSeq, count)
ackCH := newACKChan(l.ackSeq, start, count)
l.ackSeq++

req.resp <- getResponse{buf, ackCH}
l.pendingACKs.append(ackCH)
l.schedACKS = l.broker.scheduledACKs

l.stopFlushTimer()
}

func (l *eventLoop) handleACK(count int) {
// log := l.broker.logger
// log.Debug("receive buffer ack:", count)

// Give broker/buffer a chance to clean up most recent ACKs
// After handling ACKs some buffer has been freed up
// -> always reenable producers
broker := l.broker
broker.cleanACKs(count)
l.events = l.broker.events
}

func (l *eventLoop) enableFlushEvents() {
l.forceFlush = true
l.idleC = nil
}

func (l *eventLoop) stopFlushTimer() {
l.forceFlush = false
if l.idleC != nil {
l.idleC = nil
if !l.timer.Stop() {
<-l.timer.C
}
}
}

func (l *eventLoop) startFlushTimer() {
if l.idleC == nil && l.timer != nil {
l.timer.Reset(l.broker.idleTimeout)
l.idleC = l.timer.C
}
}
Loading

0 comments on commit c407f56

Please sign in to comment.