-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* memqueue cleanup Move the buffer and message handling from memqueue broker type to the actual go routines handling the messaging. * Add specialized buffers to memqueue Use specialized buffer types and eventloops depending on the memqueue configuration. If flushing is disabled, a region based ring buffer is used, as loads of small batches can be generated and we want to minimize additional allocations of small objects. If flushing is enabled a list of active and flushed buffers is managed by the queue. If a buffer is flushed by timeout, but not yet processed (and not full), additional events being published are added to the already flushed buffer. * ensure flush list does not contain empty buffers (cherry picked from commit ffd23a9)
- Loading branch information
1 parent
211421e
commit e0d65b5
Showing
9 changed files
with
909 additions
and
505 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package memqueue | ||
|
||
import "github.com/elastic/beats/libbeat/publisher" | ||
|
||
type batchBuffer struct { | ||
next *batchBuffer | ||
flushed bool | ||
events []publisher.Event | ||
clients []clientState | ||
} | ||
|
||
func newBatchBuffer(sz int) *batchBuffer { | ||
b := &batchBuffer{} | ||
b.init(sz) | ||
return b | ||
} | ||
|
||
func (b *batchBuffer) init(sz int) { | ||
b.events = make([]publisher.Event, 0, sz) | ||
b.clients = make([]clientState, 0, sz) | ||
} | ||
|
||
func (b *batchBuffer) initWith(sz int, old batchBuffer) { | ||
events, clients := old.events, old.clients | ||
L := len(events) | ||
|
||
b.events = make([]publisher.Event, L, sz) | ||
b.clients = make([]clientState, L, sz) | ||
|
||
copy(b.events, events) | ||
copy(b.clients, clients) | ||
} | ||
|
||
func (b *batchBuffer) add(event publisher.Event, st clientState) { | ||
b.events = append(b.events, event) | ||
b.clients = append(b.clients, st) | ||
} | ||
|
||
func (b *batchBuffer) length() int { | ||
return len(b.events) | ||
} | ||
|
||
func (b *batchBuffer) capacity() int { | ||
return cap(b.events) | ||
} | ||
|
||
func (b *batchBuffer) cancel(st *produceState) int { | ||
events := b.events[:0] | ||
clients := b.clients[:0] | ||
|
||
removed := 0 | ||
for i := range b.clients { | ||
if b.clients[i].state == st { | ||
removed++ | ||
continue | ||
} | ||
|
||
events = append(events, b.events[i]) | ||
clients = append(clients, b.clients[i]) | ||
} | ||
|
||
b.events = events | ||
b.clients = clients | ||
return removed | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.