diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 2c04e5965d3..8852863ee98 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -63,6 +63,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420] - Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502] - The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699] +- Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903] ==== Bugfixes diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 897c0756719..39155442ba0 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -29,7 +29,7 @@ type retryer interface { type ttlBatch struct { // The callback to inform the queue (and possibly the producer) // that this batch has been acknowledged. - ack func() + done func() // The internal hook back to the eventConsumer, used to implement the // publisher.Batch retry interface. @@ -61,7 +61,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { } b := &ttlBatch{ - ack: original.ACK, + done: original.Done, retryer: retryer, ttl: ttl, events: events, @@ -74,11 +74,11 @@ func (b *ttlBatch) Events() []publisher.Event { } func (b *ttlBatch) ACK() { - b.ack() + b.done() } func (b *ttlBatch) Drop() { - b.ack() + b.done() } func (b *ttlBatch) Retry() { diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 62d791f53b0..719237d512f 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -101,7 +101,7 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz if err != nil { return err } - batch.ACK() + batch.Done() received = received + batch.Count() if received == num_events { break diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 2380fd93717..1e7d6361383 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -90,6 +90,6 @@ func (batch *diskQueueBatch) Event(i int) interface{} { return batch.frames[i].event } -func (batch *diskQueueBatch) ACK() { +func (batch *diskQueueBatch) Done() { batch.queue.acks.addFrames(batch.frames) } diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 1b1979dc834..6d467e287a9 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -109,7 +109,7 @@ func (l *ackLoop) collectAcked() chanList { for !l.ackChans.empty() && !done { acks := l.ackChans.front() select { - case <-acks.ackChan: + case <-acks.doneChan: lst.append(l.ackChans.pop()) default: diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index cf043b02714..ed4c91565bb 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -96,9 +96,9 @@ type Settings struct { } type batch struct { - queue *broker - entries []queueEntry - ackChan chan batchAckMsg + queue *broker + entries []queueEntry + doneChan chan batchDoneMsg } // batchACKState stores the metadata associated with a batch of events sent to @@ -106,7 +106,7 @@ type batch struct { // ackChan and received by type batchACKState struct { next *batchACKState - ackChan chan batchAckMsg + doneChan chan batchDoneMsg start, count int // number of events waiting for ACK entries []queueEntry } @@ -250,9 +250,9 @@ func (b *broker) Get(count int) (queue.Batch, error) { // if request has been sent, we have to wait for a response resp := <-responseChan return &batch{ - queue: b, - entries: resp.entries, - ackChan: resp.ackChan, + queue: b, + entries: resp.entries, + doneChan: resp.ackChan, }, nil } @@ -277,7 +277,7 @@ func (b *broker) Metrics() (queue.Metrics, error) { var ackChanPool = sync.Pool{ New: func() interface{} { return &batchACKState{ - ackChan: make(chan batchAckMsg, 1), + doneChan: make(chan batchDoneMsg, 1), } }, } @@ -335,11 +335,11 @@ func (l *chanList) front() *batchACKState { return l.head } -func (l *chanList) nextBatchChannel() chan batchAckMsg { +func (l *chanList) nextBatchChannel() chan batchDoneMsg { if l.head == nil { return nil } - return l.head.ackChan + return l.head.doneChan } func (l *chanList) pop() *batchACKState { @@ -384,6 +384,6 @@ func (b *batch) Event(i int) interface{} { return b.entries[i].event } -func (b *batch) ACK() { - b.ackChan <- batchAckMsg{} +func (b *batch) Done() { + b.doneChan <- batchDoneMsg{} } diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index c7e8ec07f12..b6dc4af470d 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -185,7 +185,7 @@ func (l *directEventLoop) handleGetRequest(req *getRequest) { ackCH := newBatchACKState(start, count, l.buf.entries) - req.responseChan <- getResponse{ackCH.ackChan, buf} + req.responseChan <- getResponse{ackCH.doneChan, buf} l.pendingACKs.append(ackCH) } @@ -422,7 +422,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { entries := buf.entries[:count] acker := newBatchACKState(0, count, entries) - req.responseChan <- getResponse{acker.ackChan, entries} + req.responseChan <- getResponse{acker.doneChan, entries} l.pendingACKs.append(acker) l.unackedEventCount += len(entries) diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 4059b71aadd..c924afc785c 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -42,11 +42,11 @@ type getRequest struct { } type getResponse struct { - ackChan chan batchAckMsg + ackChan chan batchDoneMsg entries []queueEntry } -type batchAckMsg struct{} +type batchDoneMsg struct{} // Metrics API diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index a6c448a032c..5c810ac18ab 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -120,7 +120,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) // Test metrics after ack - batch.ACK() + batch.Done() queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName)) diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 11ec4aec818..1ca5fda6e14 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -131,10 +131,10 @@ type Producer interface { Cancel() int } -// Batch of events to be returned to Consumers. The `ACK` method will send the -// ACK signal to the queue. +// Batch of events to be returned to Consumers. The `Done` method will tell the +// queue that the batch has been consumed and its events can be discarded. type Batch interface { Count() int Event(i int) interface{} - ACK() + Done() } diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go index 2b44f538854..9b5671e1fee 100644 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -84,7 +84,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { for i := 0; i < batch.Count(); i++ { events = append(events, batch.Event(i)) } - batch.ACK() + batch.Done() } // verify diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 0fb95b1dfbe..e71843d2de5 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -327,7 +327,7 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { for j := 0; j < batch.Count(); j++ { events.Done() } - batch.ACK() + batch.Done() } }() }