Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #5148 to 6.0: Add specialized buffers to memqueue #5164

Merged
merged 1 commit into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ package memqueue
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *Broker
sig chan batchAckRequest
sig chan batchAckMsg
lst chanList

totalACK uint64
totalSched uint64

batchesSched uint64
batchesACKed uint64

processACK func(chanList, int)
}

func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
}

func (l *ackLoop) run() {
Expand Down Expand Up @@ -72,38 +80,51 @@ func (l *ackLoop) run() {
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
lst := l.collectAcked()

count := 0
for current := lst.front(); current != nil; current = current.next {
count += current.count
}

if e := l.broker.eventer; e != nil {
e.OnACK(count)
}

// report acks to waiting clients
l.processACK(lst, count)

for !lst.empty() {
releaseACKChan(lst.pop())
}

// return final ACK to EventLoop, in order to clean up internal buffer
l.broker.logger.Debug("ackloop: return ack to broker loop:", count)

l.totalACK += uint64(count)
l.broker.logger.Debug("ackloop: done send ack")
return count
}

func (l *ackLoop) collectAcked() chanList {
lst := chanList{}

acks := l.lst.pop()
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
start := acks.start
count := acks.count
l.batchesACKed++
releaseACKChan(acks)
lst.append(acks)

done := false
// collect pending ACKs
for !l.lst.empty() && !done {
acks := l.lst.front()
select {
case <-acks.ch:
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)

count += acks.count
l.batchesACKed++
releaseACKChan(l.lst.pop())
lst.append(l.lst.pop())

default:
done = true
}
}

// report acks to waiting clients
states := l.broker.buf.buf.clients
l.broker.reportACK(states, start, count)

// return final ACK to EventLoop, in order to clean up internal buffer
l.broker.logger.Debug("ackloop: return ack to broker loop:", count)

l.totalACK += uint64(count)
l.broker.logger.Debug("ackloop: done send ack")
return count
return lst
}
65 changes: 65 additions & 0 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package memqueue

import "github.com/elastic/beats/libbeat/publisher"

type batchBuffer struct {
next *batchBuffer
flushed bool
events []publisher.Event
clients []clientState
}

func newBatchBuffer(sz int) *batchBuffer {
b := &batchBuffer{}
b.init(sz)
return b
}

func (b *batchBuffer) init(sz int) {
b.events = make([]publisher.Event, 0, sz)
b.clients = make([]clientState, 0, sz)
}

func (b *batchBuffer) initWith(sz int, old batchBuffer) {
events, clients := old.events, old.clients
L := len(events)

b.events = make([]publisher.Event, L, sz)
b.clients = make([]clientState, L, sz)

copy(b.events, events)
copy(b.clients, clients)
}

func (b *batchBuffer) add(event publisher.Event, st clientState) {
b.events = append(b.events, event)
b.clients = append(b.clients, st)
}

func (b *batchBuffer) length() int {
return len(b.events)
}

func (b *batchBuffer) capacity() int {
return cap(b.events)
}

func (b *batchBuffer) cancel(st *produceState) int {
events := b.events[:0]
clients := b.clients[:0]

removed := 0
for i := range b.clients {
if b.clients[i].state == st {
removed++
continue
}

events = append(events, b.events[i])
clients = append(clients, b.clients[i])
}

b.events = events
b.clients = clients
return removed
}
165 changes: 33 additions & 132 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package memqueue

import (
"fmt"
"math"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)

Expand All @@ -16,9 +13,10 @@ type Broker struct {

logger logger

buf brokerBuffer
minEvents int
idleTimeout time.Duration
bufSize int
// buf brokerBuffer
// minEvents int
// idleTimeout time.Duration

// api channels
events chan pushRequest
Expand Down Expand Up @@ -46,9 +44,10 @@ type Settings struct {

type ackChan struct {
next *ackChan
ch chan batchAckRequest
ch chan batchAckMsg
seq uint
start, count int // number of events waiting for ACK
states []clientState
}

type chanList struct {
Expand Down Expand Up @@ -119,12 +118,20 @@ func NewBroker(

eventer: settings.Eventer,
}
b.buf.init(logger, sz)
b.minEvents = minEvents
b.idleTimeout = flushTimeout

eventLoop := newEventLoop(b)
ack := &ackLoop{broker: b}
var eventLoop interface {
run()
processACK(chanList, int)
}

if minEvents > 1 {
eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
} else {
eventLoop = newDirectEventLoop(b, sz)
}

b.bufSize = sz
ack := newACKLoop(b, eventLoop.processACK)

b.wg.Add(2)
go func() {
Expand All @@ -149,7 +156,7 @@ func (b *Broker) Close() error {

func (b *Broker) BufferConfig() queue.BufferConfig {
return queue.BufferConfig{
Events: b.buf.Size(),
Events: b.bufSize,
}
}

Expand All @@ -161,136 +168,21 @@ func (b *Broker) Consumer() queue.Consumer {
return newConsumer(b)
}

func (b *Broker) insert(req *pushRequest) (int, bool) {
var avail int
if req.state == nil {
_, avail = b.buf.insert(req.event, clientState{})
} else {
st := req.state
if st.cancelled {
b.logger.Debugf("cancelled producer - ignore event: %v\t%v\t%p", req.event, req.seq, req.state)

// do not add waiting events if producer did send cancel signal

if cb := st.dropCB; cb != nil {
cb(req.event.Content)
}

return -1, false
}

_, avail = b.buf.insert(req.event, clientState{
seq: req.seq,
state: st,
})
}

return avail, true
}

func (b *Broker) get(max int) (startIndex int, events []publisher.Event) {
return b.buf.reserve(max)
}

func (b *Broker) cancel(st *produceState) int {
return b.buf.cancel(st)
}

func (b *Broker) full() bool {
return b.buf.Full()
}

func (b *Broker) avail() int {
return b.buf.Avail()
}

func (b *Broker) totalAvail() int {
return b.buf.TotalAvail()
}

func (b *Broker) cleanACKs(count int) {
b.buf.ack(count)
}

func (b *Broker) reportACK(states []clientState, start, N int) {
{
start := time.Now()
b.logger.Debug("handle ACKs: ", N)
defer func() {
b.logger.Debug("handle ACK took: ", time.Since(start))
}()
}

if e := b.eventer; e != nil {
e.OnACK(N)
}

// TODO: global boolean to check if clients will need an ACK
// no need to report ACKs if no client is interested in ACKs

idx := start + N - 1
if idx >= len(states) {
idx -= len(states)
}

total := 0
for i := N - 1; i >= 0; i-- {
if idx < 0 {
idx = len(states) - 1
}

st := &states[idx]
b.logger.Debugf("try ack index: (idx=%v, i=%v, seq=%v)\n", idx, i, st.seq)

idx--
if st.state == nil {
b.logger.Debug("no state set")
continue
}

count := (st.seq - st.state.lastACK)
if count == 0 || count > math.MaxUint32/2 {
// seq number comparison did underflow. This happens only if st.seq has
// allready been acknowledged
// b.logger.Debug("seq number already acked: ", st.seq)

st.state = nil
continue
}

b.logger.Debugf("broker ACK events: count=%v, start-seq=%v, end-seq=%v\n",
count,
st.state.lastACK+1,
st.seq,
)

total += int(count)
if total > N {
panic(fmt.Sprintf("Too many events acked (expected=%v, total=%v)",
count, total,
))
}

st.state.cb(int(count))
st.state.lastACK = st.seq
st.state = nil
}
}

var ackChanPool = sync.Pool{
New: func() interface{} {
return &ackChan{
ch: make(chan batchAckRequest, 1),
ch: make(chan batchAckMsg, 1),
}
},
}

func newACKChan(seq uint, start, count int) *ackChan {
func newACKChan(seq uint, start, count int, states []clientState) *ackChan {
ch := ackChanPool.Get().(*ackChan)
ch.next = nil
ch.seq = seq
ch.start = start
ch.count = count
ch.states = states
return ch
}

Expand Down Expand Up @@ -342,7 +234,7 @@ func (l *chanList) front() *ackChan {
return l.head
}

func (l *chanList) channel() chan batchAckRequest {
func (l *chanList) channel() chan batchAckMsg {
if l.head == nil {
return nil
}
Expand All @@ -361,3 +253,12 @@ func (l *chanList) pop() *ackChan {
ch.next = nil
return ch
}

func (l *chanList) reverse() {
tmp := *l
*l = chanList{}

for !tmp.empty() {
l.prepend(tmp.pop())
}
}
Loading