diff --git a/libbeat/publisher/broker/membroker/broker.go b/libbeat/publisher/broker/membroker/broker.go index 286336c90f3..c041d46b5e3 100644 --- a/libbeat/publisher/broker/membroker/broker.go +++ b/libbeat/publisher/broker/membroker/broker.go @@ -7,6 +7,7 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/publisher/broker" ) @@ -28,8 +29,6 @@ type Broker struct { acks chan int scheduledACKs chan chanList - ackSeq uint - eventer broker.Eventer // wait group for worker shutdown @@ -124,12 +123,13 @@ func NewBroker( b.minEvents = minEvents b.idleTimeout = flushTimeout + eventLoop := newEventLoop(b) ack := &ackLoop{broker: b} b.wg.Add(2) go func() { defer b.wg.Done() - b.eventLoop() + eventLoop.run() }() go func() { defer b.wg.Done() @@ -161,143 +161,7 @@ func (b *Broker) Consumer() broker.Consumer { return newConsumer(b) } -func (b *Broker) eventLoop() { - var ( - timer *time.Timer - idleC <-chan time.Time - forceFlush bool - - events = b.events - get chan getRequest - - activeEvents int - - totalGet uint64 - totalACK uint64 - batchesGen uint64 - - // log = b.logger - - // Buffer and send pending batches to ackloop. - pendingACKs chanList - schedACKS chan chanList - ) - - if b.idleTimeout > 0 { - // create initialy 'stopped' timer -> reset will be used - // on timer object, if flush timer becomes active. - timer = time.NewTimer(b.idleTimeout) - if !timer.Stop() { - <-timer.C - } - } - - for { - select { - case <-b.done: - return - - // receiving new events into the buffer - case req := <-events: - // log.Debugf("push event: %v\t%v\t%p\n", req.event, req.seq, req.state) - - avail, ok := b.insert(req) - if !ok { - break - } - if avail == 0 { - // log.Debugf("buffer: all regions full") - events = nil - } - - case req := <-b.pubCancel: - // log.Debug("handle cancel request") - var removed int - if st := req.state; st != nil { - st.cancelled = true - removed = b.buf.cancel(st) - } - - // signal cancel request being finished - if req.resp != nil { - req.resp <- producerCancelResponse{ - removed: removed, - } - } - - // re-enable pushRequest if buffer can take new events - if !b.buf.Full() { - events = b.events - } - - case <-idleC: - forceFlush = true - idleC = nil - - case req := <-get: - start, buf := b.buf.reserve(req.sz) - count := len(buf) - if count == 0 { - panic("empty batch returned") - } - - // log.Debug("newACKChan: ", b.ackSeq, count) - ackCH := newACKChan(b.ackSeq, start, count) - b.ackSeq++ - - activeEvents += ackCH.count - totalGet += uint64(ackCH.count) - batchesGen++ - // log.Debug("broker: total events get = ", totalGet) - // log.Debug("broker: total batches generated = ", batchesGen) - - req.resp <- getResponse{buf, ackCH} - pendingACKs.append(ackCH) - schedACKS = b.scheduledACKs - - // stop flush timer on get - forceFlush = false - if idleC != nil { - idleC = nil - if !timer.Stop() { - <-timer.C - } - } - - case schedACKS <- pendingACKs: - schedACKS = nil - pendingACKs = chanList{} - - case count := <-b.acks: - // log.Debug("receive buffer ack:", count) - - activeEvents -= count - totalACK += uint64(count) - // log.Debug("broker: total events ack = ", totalACK) - - b.buf.ack(count) - // after ACK some buffer has been freed up, reenable publisher - events = b.events - } - - // update get and idle timer after state machine - - get = b.requests - if !forceFlush { - avail := b.buf.Avail() - if avail == 0 || b.buf.TotalAvail() < b.minEvents { - get = nil - - if avail > 0 && idleC == nil && timer != nil { - timer.Reset(b.idleTimeout) - idleC = timer.C - } - } - } - } -} - -func (b *Broker) insert(req pushRequest) (int, bool) { +func (b *Broker) insert(req *pushRequest) (int, bool) { var avail int if req.state == nil { _, avail = b.buf.insert(req.event, clientState{}) @@ -324,6 +188,30 @@ func (b *Broker) insert(req pushRequest) (int, bool) { return avail, true } +func (b *Broker) get(max int) (startIndex int, events []publisher.Event) { + return b.buf.reserve(max) +} + +func (b *Broker) cancel(st *produceState) int { + return b.buf.cancel(st) +} + +func (b *Broker) full() bool { + return b.buf.Full() +} + +func (b *Broker) avail() int { + return b.buf.Avail() +} + +func (b *Broker) totalAvail() int { + return b.buf.TotalAvail() +} + +func (b *Broker) cleanACKs(count int) { + b.buf.ack(count) +} + func (b *Broker) reportACK(states []clientState, start, N int) { { start := time.Now() diff --git a/libbeat/publisher/broker/membroker/eventloop.go b/libbeat/publisher/broker/membroker/eventloop.go new file mode 100644 index 00000000000..e1e73d95fad --- /dev/null +++ b/libbeat/publisher/broker/membroker/eventloop.go @@ -0,0 +1,183 @@ +package membroker + +import ( + "time" +) + +type eventLoop struct { + broker *Broker + + // active broker API channels + events chan pushRequest + get chan getRequest + pubCancel chan producerCancelRequest + + // ack handling + acks chan int // ackloop -> eventloop : total number of events ACKed by outputs + schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked + pendingACKs chanList // ordered list of active batches to be send to the ackloop + ackSeq uint // ack batch sequence number to validate ordering + + // buffer flush timer state + timer *time.Timer + idleC <-chan time.Time + forceFlush bool +} + +func newEventLoop(b *Broker) *eventLoop { + l := &eventLoop{ + broker: b, + events: b.events, + pubCancel: b.pubCancel, + acks: b.acks, + } + + if to := b.idleTimeout; to > 0 { + // create initialy 'stopped' timer -> reset will be used + // on timer object, if flush timer becomes active. + l.timer = time.NewTimer(to) + if !l.timer.Stop() { + <-l.timer.C + } + } + + return l +} + +func (l *eventLoop) run() { + broker := l.broker + for { + select { + case <-broker.done: + return + + case req := <-l.events: // producer pushing new event + l.handleInsert(&req) + + case req := <-l.pubCancel: // producer cancellig active events + l.handleCancel(&req) + + case req := <-l.get: // consumer asking for next batch + l.handleConsumer(&req) + + case <-l.idleC: + // handle flush timer being triggered -> pending events can be forwarded via 'get' + l.enableFlushEvents() + + case l.schedACKS <- l.pendingACKs: + // on send complete list of pending batches has been forwarded -> clear list and queue + l.schedACKS = nil + l.pendingACKs = chanList{} + + case count := <-l.acks: + l.handleACK(count) + + } + + // update get and idle timer after state machine + l.get = broker.requests + if !l.forceFlush { + avail := broker.avail() + if avail == 0 || broker.totalAvail() < broker.minEvents { + l.get = nil + + if avail > 0 { + l.startFlushTimer() + } + } + } + } +} + +func (l *eventLoop) handleInsert(req *pushRequest) { + // log := l.broker.logger + // log.Debugf("push event: %v\t%v\t%p\n", req.event, req.seq, req.state) + + if avail, ok := l.broker.insert(req); ok && avail == 0 { + // log.Debugf("buffer: all regions full") + + // no more space to accept new events -> unset events queue for time being + l.events = nil + } +} + +func (l *eventLoop) handleCancel(req *producerCancelRequest) { + // log := l.broker.logger + // log.Debug("handle cancel request") + + var ( + removed int + broker = l.broker + ) + + if st := req.state; st != nil { + st.cancelled = true + removed = broker.cancel(st) + } + + // signal cancel request being finished + if req.resp != nil { + req.resp <- producerCancelResponse{ + removed: removed, + } + } + + // re-enable pushRequest if buffer can take new events + if !broker.full() { + l.events = broker.events + } +} + +func (l *eventLoop) handleConsumer(req *getRequest) { + // log := l.broker.logger + + start, buf := l.broker.get(req.sz) + count := len(buf) + if count == 0 { + panic("empty batch returned") + } + + // log.Debug("newACKChan: ", b.ackSeq, count) + ackCH := newACKChan(l.ackSeq, start, count) + l.ackSeq++ + + req.resp <- getResponse{buf, ackCH} + l.pendingACKs.append(ackCH) + l.schedACKS = l.broker.scheduledACKs + + l.stopFlushTimer() +} + +func (l *eventLoop) handleACK(count int) { + // log := l.broker.logger + // log.Debug("receive buffer ack:", count) + + // Give broker/buffer a chance to clean up most recent ACKs + // After handling ACKs some buffer has been freed up + // -> always reenable producers + broker := l.broker + broker.cleanACKs(count) + l.events = l.broker.events +} + +func (l *eventLoop) enableFlushEvents() { + l.forceFlush = true + l.idleC = nil +} + +func (l *eventLoop) stopFlushTimer() { + l.forceFlush = false + if l.idleC != nil { + l.idleC = nil + if !l.timer.Stop() { + <-l.timer.C + } + } +} + +func (l *eventLoop) startFlushTimer() { + if l.idleC == nil && l.timer != nil { + l.timer.Reset(l.broker.idleTimeout) + l.idleC = l.timer.C + } +} diff --git a/libbeat/publisher/broker/membroker/produce.go b/libbeat/publisher/broker/membroker/produce.go index 8eede3647d9..8dc1ea1f63d 100644 --- a/libbeat/publisher/broker/membroker/produce.go +++ b/libbeat/publisher/broker/membroker/produce.go @@ -52,11 +52,11 @@ func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel } func (p *forgetfullProducer) Publish(event publisher.Event) bool { - return publish(p.makeRequest(event), &p.openState) + return p.openState.publish(p.makeRequest(event)) } func (p *forgetfullProducer) TryPublish(event publisher.Event) bool { - return tryPublish(p.makeRequest(event), &p.openState) + return p.openState.tryPublish(p.makeRequest(event)) } func (p *forgetfullProducer) makeRequest(event publisher.Event) pushRequest { @@ -69,11 +69,11 @@ func (p *forgetfullProducer) Cancel() int { } func (p *ackProducer) Publish(event publisher.Event) bool { - return publish(p.makeRequest(event), &p.openState) + return p.openState.publish(p.makeRequest(event)) } func (p *ackProducer) TryPublish(event publisher.Event) bool { - return tryPublish(p.makeRequest(event), &p.openState) + return p.openState.tryPublish(p.makeRequest(event)) } func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { @@ -108,7 +108,7 @@ func (st *openState) Close() { close(st.done) } -func publish(req pushRequest, st *openState) bool { +func (st *openState) publish(req pushRequest) bool { select { case st.events <- req: return true @@ -118,7 +118,7 @@ func publish(req pushRequest, st *openState) bool { } } -func tryPublish(req pushRequest, st *openState) bool { +func (st *openState) tryPublish(req pushRequest) bool { select { case st.events <- req: return true