Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[shipper] Make the memory queue accept opaque pointers #31356

Merged
merged 46 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
be59c78
remove no-op struct fields
faec Apr 13, 2022
4a21831
Merge branch 'main' into pipeline-cleanup
faec Apr 13, 2022
7bd9ff5
placate linter
faec Apr 13, 2022
40baa04
placate linter
faec Apr 13, 2022
0c95098
clean up some unneeded helper structs
faec Apr 13, 2022
f39ce4a
Merge branch 'main' into pipeline-cleanup-4
faec Apr 13, 2022
cdba499
add comment
faec Apr 13, 2022
62fc1c1
placate linter
faec Apr 14, 2022
a1bc2f9
remove unused data and code
faec Apr 14, 2022
e8334ed
remove more unused bits
faec Apr 14, 2022
8ef17e7
Merge branch 'main' into memqueue-cleanup
faec Apr 14, 2022
0c56916
remove more occurrences of internal logger
faec Apr 14, 2022
1596934
make linter happy
faec Apr 14, 2022
f49dc95
lint lint lint
faec Apr 14, 2022
825b353
in-progress: converting memqueue internals to be less opinionated abo…
faec Apr 14, 2022
05ba238
Merge branch 'main' into memqueue-cleanup-2
faec Apr 14, 2022
479a875
oops, save before commit
faec Apr 14, 2022
24f8979
working...
faec Apr 14, 2022
c850412
working on switching to queueEntry for memqueue events
faec Apr 19, 2022
e4e21c9
fixed handleGetRequest
faec Apr 19, 2022
c32fb53
more renaming
faec Apr 19, 2022
f8f0522
fix cancelRegion
faec Apr 19, 2022
bb0f7a3
lint lint
faec Apr 19, 2022
ac89d4c
remove
faec Apr 19, 2022
7d5987c
lint
faec Apr 19, 2022
f04d682
debugging
faec Apr 21, 2022
5efed4d
debug debug debug
faec Apr 26, 2022
192bc96
fix tests
faec Apr 26, 2022
00f8cf4
simplify event loop state handling / field names
faec Apr 26, 2022
4c2a579
remove more unused fields
faec Apr 26, 2022
a6395c3
make an ackLoop field local
faec Apr 26, 2022
193dd67
more field renaming
faec Apr 26, 2022
db7732d
remove unused ackState
faec Apr 26, 2022
0a85596
delete more unused stuff
faec Apr 26, 2022
6212e37
more field renaming
faec Apr 26, 2022
a554797
more removed / renamed fields
faec Apr 26, 2022
b08e9fd
remove redundant fields
faec Apr 26, 2022
066410a
removing more redundant event loop fields
faec Apr 26, 2022
5f69df3
removing more redundant event loop fields
faec Apr 26, 2022
33e6290
document more fields
faec Apr 26, 2022
6da4360
lint?
faec Apr 27, 2022
1196848
tidy old comments
faec Apr 27, 2022
afd448d
remove some unused calculations
faec Apr 27, 2022
4b092ec
review fixes
faec Apr 28, 2022
9212cac
fix ringBuffer.Full
faec Apr 28, 2022
88ea56e
oops, fix ringBuffer.Full with the proper parity
faec Apr 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,46 @@ package memqueue
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *broker
sig chan batchAckMsg
lst chanList

// A list of ACK channels given to queue consumers,
// used to maintain sequencing of event acknowledgements.
ackChans chanList

totalACK uint64

processACK func(chanList, int)
}

func newACKLoop(b *broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
}

func (l *ackLoop) run() {
var (
// log = l.broker.logger

// Buffer up acked event counter in acked. If acked > 0, acks will be set to
// Buffer up event counter in ackCount. If ackCount > 0, acks will be set to
// the broker.acks channel for sending the ACKs while potentially receiving
// new batches from the broker event loop.
// This concurrent bidirectionally communication pattern requiring 'select'
// ensures we can not have any deadlock between the event loop and the ack
// loop, as the ack loop will not block on any channel
acked int
acks chan int
ackCount int
ackChan chan int
sig chan batchAckMsg
)

for {
select {
case <-l.broker.done:
return

case acks <- acked:
acks, acked = nil, 0
case ackChan <- ackCount:
ackChan, ackCount = nil, 0

case lst := <-l.broker.scheduledACKs:
l.lst.concat(&lst)
case chanList := <-l.broker.scheduledACKs:
l.ackChans.concat(&chanList)

case <-l.sig:
acked += l.handleBatchSig()
if acked > 0 {
acks = l.broker.acks
case <-sig:
ackCount += l.handleBatchSig()
if ackCount > 0 {
ackChan = l.broker.ackChan
}
}

Expand All @@ -76,7 +73,7 @@ func (l *ackLoop) run() {
// log.Debug("ackloop: total batches scheduled = ", l.batchesSched)
// log.Debug("ackloop: total batches ack = ", l.batchesACKed)

l.sig = l.lst.channel()
sig = l.ackChans.channel()
// if l.sig == nil {
// log.Debug("ackloop: no ack scheduled")
// } else {
Expand Down Expand Up @@ -119,17 +116,15 @@ func (l *ackLoop) handleBatchSig() int {
func (l *ackLoop) collectAcked() chanList {
lst := chanList{}

acks := l.lst.pop()
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
acks := l.ackChans.pop()
lst.append(acks)

done := false
for !l.lst.empty() && !done {
acks := l.lst.front()
for !l.ackChans.empty() && !done {
acks := l.ackChans.front()
select {
case <-acks.ch:
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
lst.append(l.lst.pop())
case <-acks.ackChan:
lst.append(l.ackChans.pop())

default:
done = true
Expand Down
43 changes: 18 additions & 25 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,42 @@ package memqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

type queueEntry struct {
event interface{}
client clientState
}

type batchBuffer struct {
next *batchBuffer
flushed bool
events []publisher.Event
clients []clientState
entries []queueEntry
}

func newBatchBuffer(sz int) *batchBuffer {
b := &batchBuffer{}
b.init(sz)
b.entries = make([]queueEntry, 0, sz)
return b
}

func (b *batchBuffer) init(sz int) {
b.events = make([]publisher.Event, 0, sz)
b.clients = make([]clientState, 0, sz)
}

func (b *batchBuffer) add(event publisher.Event, st clientState) {
b.events = append(b.events, event)
b.clients = append(b.clients, st)
func (b *batchBuffer) add(event *publisher.Event, st clientState) {
b.entries = append(b.entries, queueEntry{event, st})
}

func (b *batchBuffer) length() int {
return len(b.events)
return len(b.entries)
}

func (b *batchBuffer) cancel(st *produceState) int {
events := b.events[:0]
clients := b.clients[:0]
entries := b.entries[:0]

removed := 0
for i := range b.clients {
if b.clients[i].state == st {
removed++
removedCount := 0
for _, entry := range b.entries {
if entry.client.state == st {
removedCount++
continue
}

events = append(events, b.events[i])
clients = append(clients, b.clients[i])
entries = append(entries, entry)
}

b.events = events
b.clients = clients
return removed
b.entries = entries
return removedCount
}
86 changes: 57 additions & 29 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,40 @@ type broker struct {

bufSize int

///////////////////////////
// api channels
events chan pushRequest
requests chan getRequest
pubCancel chan producerCancelRequest

// Producers send requests to pushChan to add events to the queue.
pushChan chan pushRequest

// Consumers send requests to getChan to read events from the queue.
getChan chan getRequest

// Producers send requests to cancelChan to cancel events they've
// sent so far that have not yet reached a consumer.
cancelChan chan producerCancelRequest

///////////////////////////
// internal channels
acks chan int

// When ackLoop receives events ACKs from a consumer, it sends the number
// of ACKed events to ackChan to notify the event loop that those
// events can be removed from the queue.
ackChan chan int

// When events are sent to consumers, the ACK channels for their batches
// are collected into chanLists and sent to scheduledACKs.
// These are then read by ackLoop and concatenated to its internal
// chanList of all outstanding ACK channels.
scheduledACKs chan chanList

// A listener that should be notified when ACKs are processed.
// ackLoop calls this listener's OnACK function when it advances
// the consumer ACK position.
// Right now this listener always points at the Pipeline associated with
// this queue. Pipeline.OnACK then forwards the notification to
// Pipeline.observer.queueACKed(), which updates the beats registry
// if needed.
ackListener queue.ACKListener

// wait group for worker shutdown
Expand All @@ -62,17 +87,19 @@ type Settings struct {
InputQueueSize int
}

type ackChan struct {
next *ackChan
ch chan batchAckMsg
seq uint
// batchACKState stores the metadata associated with a batch of events sent to
// a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on
// ackChan and received by
type batchACKState struct {
next *batchACKState
ackChan chan batchAckMsg
start, count int // number of events waiting for ACK
states []clientState
entries []queueEntry
}

type chanList struct {
head *ackChan
tail *ackChan
head *batchACKState
tail *batchACKState
}

func init() {
Expand Down Expand Up @@ -141,12 +168,12 @@ func NewQueue(
logger: logger,

// broker API channels
events: make(chan pushRequest, chanSize),
requests: make(chan getRequest),
pubCancel: make(chan producerCancelRequest, 5),
pushChan: make(chan pushRequest, chanSize),
getChan: make(chan getRequest),
cancelChan: make(chan producerCancelRequest, 5),

// internal broker and ACK handler channels
acks: make(chan int),
ackChan: make(chan int),
scheduledACKs: make(chan chanList),

ackListener: settings.ACKListener,
Expand All @@ -164,7 +191,9 @@ func NewQueue(
}

b.bufSize = sz
ack := newACKLoop(b, eventLoop.processACK)
ackLoop := &ackLoop{
broker: b,
processACK: eventLoop.processACK}

b.wg.Add(2)
go func() {
Expand All @@ -173,7 +202,7 @@ func NewQueue(
}()
go func() {
defer b.wg.Done()
ack.run()
ackLoop.run()
}()

return b
Expand All @@ -200,29 +229,28 @@ func (b *broker) Consumer() queue.Consumer {

var ackChanPool = sync.Pool{
New: func() interface{} {
return &ackChan{
ch: make(chan batchAckMsg, 1),
return &batchACKState{
ackChan: make(chan batchAckMsg, 1),
}
},
}

func newACKChan(seq uint, start, count int, states []clientState) *ackChan {
func newBatchACKState(start, count int, entries []queueEntry) *batchACKState {
//nolint: errcheck // Return value doesn't need to be checked before conversion.
ch := ackChanPool.Get().(*ackChan)
ch := ackChanPool.Get().(*batchACKState)
ch.next = nil
ch.seq = seq
ch.start = start
ch.count = count
ch.states = states
ch.entries = entries
return ch
}

func releaseACKChan(c *ackChan) {
func releaseACKChan(c *batchACKState) {
c.next = nil
ackChanPool.Put(c)
}

func (l *chanList) prepend(ch *ackChan) {
func (l *chanList) prepend(ch *batchACKState) {
ch.next = l.head
l.head = ch
if l.tail == nil {
Expand All @@ -244,7 +272,7 @@ func (l *chanList) concat(other *chanList) {
l.tail = other.tail
}

func (l *chanList) append(ch *ackChan) {
func (l *chanList) append(ch *batchACKState) {
if l.head == nil {
l.head = ch
} else {
Expand All @@ -257,18 +285,18 @@ func (l *chanList) empty() bool {
return l.head == nil
}

func (l *chanList) front() *ackChan {
func (l *chanList) front() *batchACKState {
return l.head
}

func (l *chanList) channel() chan batchAckMsg {
if l.head == nil {
return nil
}
return l.head.ch
return l.head.ackChan
}

func (l *chanList) pop() *ackChan {
func (l *chanList) pop() *batchACKState {
ch := l.head
if ch != nil {
l.head = ch.next
Expand Down
Loading