Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename queue.Batch.ACK -> queue.Batch.Done #31903

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Remove `common.MapStr` and use `mapstr.M` from `github.com/elastic/elastic-agent-libs` instead. {pull}31420[31420]
- Remove `queue.Consumer`. Queues can now be read via a `Get` call directly on the queue object. {pull}31502[31502]
- The `queue.Batch` API now provides access to individual events instead of an array. {pull}31699[31699]
- Rename `queue.Batch.ACK()` to `queue.Batch.Done()`. {pull}31903[31903]

==== Bugfixes

Expand Down
8 changes: 4 additions & 4 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type retryer interface {
type ttlBatch struct {
// The callback to inform the queue (and possibly the producer)
// that this batch has been acknowledged.
ack func()
done func()

// The internal hook back to the eventConsumer, used to implement the
// publisher.Batch retry interface.
Expand Down Expand Up @@ -61,7 +61,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
}

b := &ttlBatch{
ack: original.ACK,
done: original.Done,
retryer: retryer,
ttl: ttl,
events: events,
Expand All @@ -74,11 +74,11 @@ func (b *ttlBatch) Events() []publisher.Event {
}

func (b *ttlBatch) ACK() {
b.ack()
b.done()
}

func (b *ttlBatch) Drop() {
b.ack()
b.done()
}

func (b *ttlBatch) Retry() {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz
if err != nil {
return err
}
batch.ACK()
batch.Done()
received = received + batch.Count()
if received == num_events {
break
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ func (batch *diskQueueBatch) Event(i int) interface{} {
return batch.frames[i].event
}

func (batch *diskQueueBatch) ACK() {
func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (l *ackLoop) collectAcked() chanList {
for !l.ackChans.empty() && !done {
acks := l.ackChans.front()
select {
case <-acks.ackChan:
case <-acks.doneChan:
lst.append(l.ackChans.pop())

default:
Expand Down
24 changes: 12 additions & 12 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,17 @@ type Settings struct {
}

type batch struct {
queue *broker
entries []queueEntry
ackChan chan batchAckMsg
queue *broker
entries []queueEntry
doneChan chan batchDoneMsg
}

// batchACKState stores the metadata associated with a batch of events sent to
// a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on
// ackChan and received by
type batchACKState struct {
next *batchACKState
ackChan chan batchAckMsg
doneChan chan batchDoneMsg
start, count int // number of events waiting for ACK
entries []queueEntry
}
Expand Down Expand Up @@ -250,9 +250,9 @@ func (b *broker) Get(count int) (queue.Batch, error) {
// if request has been sent, we have to wait for a response
resp := <-responseChan
return &batch{
queue: b,
entries: resp.entries,
ackChan: resp.ackChan,
queue: b,
entries: resp.entries,
doneChan: resp.ackChan,
}, nil
}

Expand All @@ -277,7 +277,7 @@ func (b *broker) Metrics() (queue.Metrics, error) {
var ackChanPool = sync.Pool{
New: func() interface{} {
return &batchACKState{
ackChan: make(chan batchAckMsg, 1),
doneChan: make(chan batchDoneMsg, 1),
}
},
}
Expand Down Expand Up @@ -335,11 +335,11 @@ func (l *chanList) front() *batchACKState {
return l.head
}

func (l *chanList) nextBatchChannel() chan batchAckMsg {
func (l *chanList) nextBatchChannel() chan batchDoneMsg {
if l.head == nil {
return nil
}
return l.head.ackChan
return l.head.doneChan
}

func (l *chanList) pop() *batchACKState {
Expand Down Expand Up @@ -384,6 +384,6 @@ func (b *batch) Event(i int) interface{} {
return b.entries[i].event
}

func (b *batch) ACK() {
b.ackChan <- batchAckMsg{}
func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (l *directEventLoop) handleGetRequest(req *getRequest) {

ackCH := newBatchACKState(start, count, l.buf.entries)

req.responseChan <- getResponse{ackCH.ackChan, buf}
req.responseChan <- getResponse{ackCH.doneChan, buf}
l.pendingACKs.append(ackCH)
}

Expand Down Expand Up @@ -422,7 +422,7 @@ func (l *bufferingEventLoop) handleGetRequest(req *getRequest) {
entries := buf.entries[:count]
acker := newBatchACKState(0, count, entries)

req.responseChan <- getResponse{acker.ackChan, entries}
req.responseChan <- getResponse{acker.doneChan, entries}
l.pendingACKs.append(acker)

l.unackedEventCount += len(entries)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type getRequest struct {
}

type getResponse struct {
ackChan chan batchAckMsg
ackChan chan batchDoneMsg
entries []queueEntry
}

type batchAckMsg struct{}
type batchDoneMsg struct{}

// Metrics API

Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func queueTestWithSettings(t *testing.T, settings Settings, eventsToTest int, te
queueMetricsAreValid(t, testQueue, 5, settings.Events, 5, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

// Test metrics after ack
batch.ACK()
batch.Done()

queueMetricsAreValid(t, testQueue, 0, settings.Events, 0, fmt.Sprintf("%s - Producer Getting events, no ACK", testName))

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ type Producer interface {
Cancel() int
}

// Batch of events to be returned to Consumers. The `ACK` method will send the
// ACK signal to the queue.
// Batch of events to be returned to Consumers. The `Done` method will tell the
// queue that the batch has been consumed and its events can be discarded.
type Batch interface {
Count() int
Event(i int) interface{}
ACK()
Done()
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queuetest/producer_cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {
for i := 0; i < batch.Count(); i++ {
events = append(events, batch.Event(i))
}
batch.ACK()
batch.Done()
}

// verify
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/queuetest/queuetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory {
for j := 0; j < batch.Count(); j++ {
events.Done()
}
batch.ACK()
batch.Done()
}
}()
}
Expand Down