diff --git a/libbeat/common/fifo/fifo.go b/libbeat/common/fifo/fifo.go new file mode 100644 index 00000000000..03d220186db --- /dev/null +++ b/libbeat/common/fifo/fifo.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fifo + +type FIFO[T any] struct { + first *node[T] + last *node[T] +} + +type node[T any] struct { + next *node[T] + value T +} + +func (f *FIFO[T]) Add(value T) { + newNode := &node[T]{value: value} + if f.first == nil { + f.first = newNode + } else { + f.last.next = newNode + } + f.last = newNode +} + +func (f *FIFO[T]) Empty() bool { + return f.first == nil +} + +// Return the first value (if present) without removing it from the queue. +// Returns a default value if the queue is empty. To recognize this case, +// check (*FIFO).Empty(). +func (f *FIFO[T]) First() T { + if f.first == nil { + var none T + return none + } + return f.first.value +} + +// Remove the first entry in this FIFO and return it. +func (f *FIFO[T]) ConsumeFirst() T { + result := f.First() + f.Remove() + return result +} + +// Append another FIFO queue to an existing one. Takes ownership of +// the given FIFO's contents. +func (f *FIFO[T]) Concat(list FIFO[T]) { + if list.Empty() { + return + } + if f.Empty() { + *f = list + return + } + f.last.next = list.first + f.last = list.last +} + +// Remove the first entry in the queue. Does nothing if the FIFO is empty. +func (f *FIFO[T]) Remove() { + if f.first != nil { + f.first = f.first.next + if f.first == nil { + f.last = nil + } + } +} diff --git a/libbeat/docs/queueconfig.asciidoc b/libbeat/docs/queueconfig.asciidoc index 499ab9d4667..3bd0d04456a 100644 --- a/libbeat/docs/queueconfig.asciidoc +++ b/libbeat/docs/queueconfig.asciidoc @@ -67,6 +67,17 @@ queue.mem: flush.timeout: 5s ------------------------------------------------------------------------------ +Here is an alternate configuration that measures queue size in bytes rather +than event count. In this case, the output must set `bulk_max_bytes` +instead of `bulk_max_size` to control the batch size: + +[source,yaml] +------------------------------------------------------------------------------ +queue.mem: + bytes: 32MB + flush.timeout: 10s +------------------------------------------------------------------------------ + [float] === Configuration options @@ -80,6 +91,16 @@ Number of events the queue can store. The default value is 3200 events. +[float] +[[queue-mem-bytes-option]] +===== `bytes` + +Number of bytes the queue can store. This option is only available for outputs +that support byte-based event buffers (currently just the Elasticsearch output). +The queue should set either `events` or `bytes` but not both. + +The default is 0, indicating the queue should use the `events` limit instead. + [float] [[queue-mem-flush-min-events-option]] ===== `flush.min_events` @@ -92,6 +113,8 @@ publishing. If 0 or 1, sets the maximum number of events per batch to half the queue size, and sets the queue to synchronous mode (equivalent to `flush.timeout` of 0). +This value is ignored when `bytes` is set. + The default value is 1600. [float] diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index e2f58fb4b16..ac0a08932d8 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -165,9 +165,9 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report }, queueConfig, outputs.Group{ - Clients: []outputs.Client{outClient}, - BatchSize: windowSize, - Retry: 0, // no retry. Drop event on error. + Clients: []outputs.Client{outClient}, + BatchEvents: windowSize, + Retry: 0, // no retry. Drop event on error. }, pipeline.Settings{ WaitClose: 0, diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 867316fe3a9..9523e0c6d40 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -85,7 +85,7 @@ func makeConsole( } } - return outputs.Success(config.Queue, config.BatchSize, 0, nil, c) + return outputs.Success(config.Queue, config.BatchSize, 0, 0, nil, c) } func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) { diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go index bfd1a1c1add..eafda2e2725 100644 --- a/libbeat/outputs/discard/discard.go +++ b/libbeat/outputs/discard/discard.go @@ -56,7 +56,7 @@ func makeDiscard( // disable bulk support in publisher pipeline _ = cfg.SetInt("bulk_max_size", -1, -1) out.log.Infof("Initialized discard output") - return outputs.Success(doConfig.Queue, -1, 0, nil, out) + return outputs.Success(doConfig.Queue, -1, 0, 0, nil, out) } // Implement Outputer diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index 2f3a325c178..cdd11b84245 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/elastic/beats/v7/libbeat/common/cfgtype" "github.com/elastic/beats/v7/libbeat/common/transport/kerberos" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport/httpcommon" @@ -39,6 +40,7 @@ type elasticsearchConfig struct { EscapeHTML bool `config:"escape_html"` Kerberos *kerberos.Config `config:"kerberos"` BulkMaxSize int `config:"bulk_max_size"` + BulkMaxBytes cfgtype.ByteSize `config:"bulk_max_bytes"` MaxRetries int `config:"max_retries"` Backoff Backoff `config:"backoff"` NonIndexablePolicy *config.Namespace `config:"non_indexable_policy"` diff --git a/libbeat/outputs/elasticsearch/elasticsearch.go b/libbeat/outputs/elasticsearch/elasticsearch.go index 9bc8498afe4..a4d599ec718 100644 --- a/libbeat/outputs/elasticsearch/elasticsearch.go +++ b/libbeat/outputs/elasticsearch/elasticsearch.go @@ -135,7 +135,14 @@ func makeES( clients[i] = client } - return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, encoderFactory, clients) + return outputs.SuccessNet( + esConfig.Queue, + esConfig.LoadBalance, + esConfig.BulkMaxSize, + int(esConfig.BulkMaxBytes), + esConfig.MaxRetries, + encoderFactory, + clients) } func buildSelectors( diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index f650ff3f964..93caad3b861 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -67,7 +67,7 @@ func makeFileout( return outputs.Fail(err) } - return outputs.Success(foConfig.Queue, -1, 0, nil, fo) + return outputs.Success(foConfig.Queue, -1, 0, 0, nil, fo) } func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error { diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index cb23823a95a..785a8ab136d 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -84,7 +84,7 @@ func makeKafka( if kConfig.MaxRetries < 0 { retry = -1 } - return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, nil, client) + return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, 0, retry, nil, client) } // buildTopicSelector builds the topic selector for standalone Beat and when diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index c4c51ae5437..6a3711e4f4d 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -85,5 +85,5 @@ func makeLogstash( clients[i] = client } - return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, nil, clients) + return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, 0, lsConfig.MaxRetries, nil, clients) } diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index fdd8e22a663..83b3feb73fa 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -56,7 +56,8 @@ type IndexSelector interface { // configuration into the outputs. type Group struct { Clients []Client - BatchSize int + BatchEvents int + BatchBytes int Retry int QueueFactory queue.QueueFactory diff --git a/libbeat/outputs/redis/redis.go b/libbeat/outputs/redis/redis.go index d0cba1e7061..0bff9c9ee1a 100644 --- a/libbeat/outputs/redis/redis.go +++ b/libbeat/outputs/redis/redis.go @@ -165,7 +165,7 @@ func makeRedis( clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max) } - return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, nil, clients) + return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, 0, rConfig.MaxRetries, nil, clients) } func buildKeySelector(cfg *config.C) (outil.Selector, error) { diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index 8b3d96fcaa5..6157c5b1fe6 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -35,7 +35,7 @@ func Fail(err error) (Group, error) { return Group{}, err } // instances. The first argument is expected to contain a queue // config.Namespace. The queue config is passed to assign the queue // factory when elastic-agent reloads the output. -func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) { +func Success(cfg config.Namespace, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) { var q queue.QueueFactory if cfg.IsSet() && cfg.Config().Enabled() { switch cfg.Name() { @@ -60,7 +60,8 @@ func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.En } return Group{ Clients: clients, - BatchSize: batchSize, + BatchEvents: batchEvents, + BatchBytes: batchBytes, Retry: retry, QueueFactory: q, EncoderFactory: encoderFactory, @@ -80,12 +81,12 @@ func NetworkClients(netclients []NetworkClient) []Client { // The first argument is expected to contain a queue config.Namespace. // The queue config is passed to assign the queue factory when // elastic-agent reloads the output. -func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { +func SuccessNet(cfg config.Namespace, loadbalance bool, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { if !loadbalance { - return Success(cfg, batchSize, retry, encoderFactory, NewFailoverClient(netclients)) + return Success(cfg, batchEvents, batchBytes, retry, encoderFactory, NewFailoverClient(netclients)) } clients := NetworkClients(netclients) - return Success(cfg, batchSize, retry, encoderFactory, clients...) + return Success(cfg, batchEvents, batchBytes, retry, encoderFactory, clients...) } diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index af756213a63..f2adb3386ae 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -118,9 +118,9 @@ func (c *client) publish(e beat.Event) { var published bool if c.canDrop { - _, published = c.producer.TryPublish(pubEvent) + published = c.producer.TryPublish(pubEvent) } else { - _, published = c.producer.Publish(pubEvent) + published = c.producer.Publish(pubEvent) } if published { diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index f729d417ca5..7e1d1c4b3be 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -88,11 +88,12 @@ func TestClient(t *testing.T) { l := logp.L() // a small in-memory queue with a very short flush interval - q := memqueue.NewQueue(l, nil, memqueue.Settings{ + q, err := memqueue.NewQueue(l, nil, memqueue.Settings{ Events: 5, MaxGetRequest: 1, FlushTimeout: time.Millisecond, }, 5, nil) + require.NoError(t, err, "Queue creation must succeed") // model a processor that we're going to make produce errors after p := &testProcessor{} @@ -114,7 +115,7 @@ func TestClient(t *testing.T) { done := make(chan struct{}) go func() { for { - batch, err := q.Get(2) + batch, err := q.Get(2, 0) if errors.Is(err, io.EOF) { break } @@ -201,7 +202,8 @@ func TestClientWaitClose(t *testing.T) { } logp.TestingSetup() - q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil) + q, err := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil) + require.NoError(t, err, "Queue creation must succeed") pipeline := makePipeline(Settings{}, q) defer pipeline.Close() @@ -306,8 +308,8 @@ func TestMonitoring(t *testing.T) { }) } return "output_name", outputs.Group{ - BatchSize: batchSize, - Clients: clients, + BatchEvents: batchSize, + Clients: clients, }, nil }, ) diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index a7806a3ded2..ec9c11fcbee 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -58,10 +58,11 @@ type eventConsumer struct { // consumerTarget specifies the queue to read from, the parameters needed // to generate a batch, and the output channel to send batches to. type consumerTarget struct { - queue queue.Queue - ch chan publisher.Batch - timeToLive int - batchSize int + queue queue.Queue + ch chan publisher.Batch + timeToLive int + batchEvents int + batchBytes int } // retryRequest is used by ttlBatch to add itself back to the eventConsumer @@ -134,7 +135,8 @@ outerLoop: c.queueReader.req <- queueReaderRequest{ queue: target.queue, retryer: c, - batchSize: target.batchSize, + eventCount: target.batchEvents, + byteCount: target.batchBytes, timeToLive: target.timeToLive, } } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index d7e07846e0c..fc928508806 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -159,10 +159,11 @@ func (c *outputController) Set(outGrp outputs.Group) { // Resume consumer targeting the new work queue c.consumer.setTarget( consumerTarget{ - queue: c.queue, - ch: targetChan, - batchSize: outGrp.BatchSize, - timeToLive: outGrp.Retry + 1, + queue: c.queue, + ch: targetChan, + batchEvents: outGrp.BatchEvents, + batchBytes: outGrp.BatchBytes, + timeToLive: outGrp.Retry + 1, }) } @@ -285,7 +286,9 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { if err != nil { logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration") s, _ := memqueue.SettingsForUserConfig(nil) - queue = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory) + // Memqueue creation can only fail when it's configured for byte-based limits, + // so we don't need to handle the fallback error. + queue, _ = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory) } c.queue = queue @@ -307,12 +310,12 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { // a producer for a nonexistent queue. type emptyProducer struct{} -func (emptyProducer) Publish(_ queue.Entry) (queue.EntryID, bool) { - return 0, false +func (emptyProducer) Publish(_ queue.Entry) bool { + return false } -func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) { - return 0, false +func (emptyProducer) TryPublish(_ queue.Entry) bool { + return false } func (emptyProducer) Close() { diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 934d3c0db3d..71eef324d41 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -155,7 +155,7 @@ func loadOutput( telemetry = monitors.Telemetry.NewRegistry("output") } monitoring.NewString(telemetry, "name").Set(outName) - monitoring.NewInt(telemetry, "batch_size").Set(int64(out.BatchSize)) + monitoring.NewInt(telemetry, "batch_size").Set(int64(out.BatchEvents)) monitoring.NewInt(telemetry, "clients").Set(int64(len(out.Clients))) } diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index 4a1e5ad76a1..50a32ad13fb 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -74,11 +74,6 @@ type metricsObserverVars struct { eventsTotal, eventsFiltered, eventsPublished, eventsFailed *monitoring.Uint eventsDropped, eventsRetry *monitoring.Uint // (retryer) drop/retry counters activeEvents *monitoring.Uint - - // queue metrics - queueACKed *monitoring.Uint - queueMaxEvents *monitoring.Uint - percentQueueFull *monitoring.Float } func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { @@ -118,19 +113,6 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { // events.dropped counts events that were dropped because errors from // the output workers exceeded the configured maximum retry count. eventsDropped: monitoring.NewUint(reg, "events.dropped"), - - // (Gauge) queue.max_events measures the maximum number of events the - // queue will accept, or 0 if there is none. - queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"), - - // queue.acked counts events that have been acknowledged by the output - // workers. This includes events that were dropped for fatal errors, - // which are also reported in events.dropped. - queueACKed: monitoring.NewUint(reg, "queue.acked"), - - // (Gauge) queue.filled.pct.events measures the fraction (from 0 to 1) - // of the queue's event capacity that is currently filled. - percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"), }, } } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index a5a13a0584e..c4f6ad9a047 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -239,8 +239,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } }, } - - if ackHandler == nil { + if ackHandler != nil { + producerCfg.ACK = ackHandler.ACKEvents + } else { ackHandler = acker.Nil() } diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index a8cf34b895a..ef30b1e6a12 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -98,9 +98,9 @@ func makeDiscardQueue() queue.Queue { // it's also the returned Event ID count := uint64(0) producer := &testProducer{ - publish: func(try bool, event queue.Entry) (queue.EntryID, bool) { + publish: func(try bool, event queue.Entry) bool { count++ - return queue.EntryID(count), true + return true }, cancel: func() { wg.Done() @@ -121,7 +121,7 @@ type testQueue struct { } type testProducer struct { - publish func(try bool, event queue.Entry) (queue.EntryID, bool) + publish func(try bool, event queue.Entry) bool cancel func() } @@ -154,25 +154,25 @@ func (q *testQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return nil } -func (q *testQueue) Get(sz int) (queue.Batch, error) { +func (q *testQueue) Get(sz int, _ int) (queue.Batch, error) { if q.get != nil { return q.get(sz) } return nil, nil } -func (p *testProducer) Publish(event queue.Entry) (queue.EntryID, bool) { +func (p *testProducer) Publish(event queue.Entry) bool { if p.publish != nil { return p.publish(false, event) } - return 0, false + return false } -func (p *testProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { +func (p *testProducer) TryPublish(event queue.Entry) bool { if p.publish != nil { return p.publish(true, event) } - return 0, false + return false } func (p *testProducer) Close() { @@ -206,7 +206,7 @@ func makeTestQueue() queue.Queue { var producer *testProducer p := blockingProducer(cfg) producer = &testProducer{ - publish: func(try bool, event queue.Entry) (queue.EntryID, bool) { + publish: func(try bool, event queue.Entry) bool { if try { return p.TryPublish(event) } @@ -234,10 +234,10 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { waiting := atomic.MakeInt(0) return &testProducer{ - publish: func(_ bool, _ queue.Entry) (queue.EntryID, bool) { + publish: func(_ bool, _ queue.Entry) bool { waiting.Inc() <-sig - return 0, false + return false }, cancel: func() { diff --git a/libbeat/publisher/pipeline/queue_reader.go b/libbeat/publisher/pipeline/queue_reader.go index fa68b83739c..b240fb771a9 100644 --- a/libbeat/publisher/pipeline/queue_reader.go +++ b/libbeat/publisher/pipeline/queue_reader.go @@ -33,7 +33,8 @@ type queueReader struct { type queueReaderRequest struct { queue queue.Queue retryer retryer - batchSize int + eventCount int + byteCount int timeToLive int } @@ -54,7 +55,7 @@ func (qr *queueReader) run(logger *logp.Logger) { logger.Debug("pipeline event consumer queue reader: stop") return } - queueBatch, _ := req.queue.Get(req.batchSize) + queueBatch, _ := req.queue.Get(req.eventCount, req.byteCount) var batch *ttlBatch if queueBatch != nil { batch = newBatch(req.retryer, queueBatch, req.timeToLive) diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index b0e4c3d4e39..027f6d3fd1a 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -67,7 +67,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs clients[i] = client } - return outputs.Success(config.Queue, config.BulkMaxSize, config.Retry, nil, clients...) + return outputs.Success(config.Queue, config.BulkMaxSize, 0, config.Retry, nil, clients...) } func (*testOutput) Close() error { return nil } diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 7665c4fd780..f32dac087c2 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -99,7 +99,7 @@ func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue func publishEvents(r *rand.Rand, p queue.Producer, num int) { for i := 0; i < num; i++ { e := makePublisherEvent(r) - _, ok := p.Publish(e) + ok := p.Publish(e) if !ok { panic("didn't publish") } @@ -109,7 +109,7 @@ func publishEvents(r *rand.Rand, p queue.Producer, num int) { func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error { var received int for { - batch, err := q.Get(batch_size) + batch, err := q.Get(batch_size, 0) if err != nil { return err } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 20e6648d927..67aa95cdf05 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -28,7 +28,7 @@ type diskQueueBatch struct { frames []*readFrame } -func (dq *diskQueue) Get(eventCount int) (queue.Batch, error) { +func (dq *diskQueue) Get(eventCount int, _ int) (queue.Batch, error) { // We can always eventually read at least one frame unless the queue or the // consumer is closed. frame, ok := <-dq.readerLoop.output diff --git a/libbeat/publisher/queue/diskqueue/consumer_test.go b/libbeat/publisher/queue/diskqueue/consumer_test.go index 80378029be2..4d316f33aa0 100644 --- a/libbeat/publisher/queue/diskqueue/consumer_test.go +++ b/libbeat/publisher/queue/diskqueue/consumer_test.go @@ -38,7 +38,7 @@ func TestQueueGetObserver(t *testing.T) { for i := 0; i < eventCount; i++ { dq.readerLoop.output <- &readFrame{bytesOnDisk: 123} } - _, err := dq.Get(eventCount) + _, err := dq.Get(eventCount, 0) assert.NoError(t, err, "Queue Get call should succeed") assertRegistryUint(t, reg, "queue.consumed.events", eventCount, "Get call should report consumed events") assertRegistryUint(t, reg, "queue.consumed.bytes", eventCount*123, "Get call should report consumed bytes") diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index c379ac40637..e16d4806912 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -49,12 +49,12 @@ type producerWriteRequest struct { // diskQueueProducer implementation of the queue.Producer interface // -func (producer *diskQueueProducer) Publish(event queue.Entry) (queue.EntryID, bool) { - return 0, producer.publish(event, true) +func (producer *diskQueueProducer) Publish(event queue.Entry) bool { + return producer.publish(event, true) } -func (producer *diskQueueProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { - return 0, producer.publish(event, false) +func (producer *diskQueueProducer) TryPublish(event queue.Entry) bool { + return producer.publish(event, false) } func (producer *diskQueueProducer) publish( diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index 9432bd5af19..94a02ba2e4a 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -37,7 +37,10 @@ func newACKLoop(broker *broker) *ackLoop { func (l *ackLoop) run() { b := l.broker for { - nextBatchChan := l.pendingBatches.nextBatchChannel() + var nextBatchChan chan batchDoneMsg + if !l.pendingBatches.Empty() { + nextBatchChan = l.pendingBatches.First().doneChan + } select { case <-b.ctx.Done(): @@ -46,7 +49,7 @@ func (l *ackLoop) run() { case chanList := <-b.consumedChan: // New batches have been generated, add them to the pending list - l.pendingBatches.concat(&chanList) + l.pendingBatches.Concat(chanList) case <-nextBatchChan: // The oldest outstanding batch has been acknowledged, advance our @@ -58,43 +61,30 @@ func (l *ackLoop) run() { // handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig // is run by the ackLoop. -func (l *ackLoop) handleBatchSig() int { +func (l *ackLoop) handleBatchSig() { ackedBatches := l.collectAcked() - count := 0 - for batch := ackedBatches.front(); batch != nil; batch = batch.next { - count += batch.count - } - - if count > 0 { + if !ackedBatches.Empty() { // report acks to waiting clients - l.processACK(ackedBatches, count) - } - - for !ackedBatches.empty() { - // Release finished batch structs into the shared memory pool - releaseBatch(ackedBatches.pop()) + l.processACK(ackedBatches) } - - // return final ACK to EventLoop, in order to clean up internal buffer - l.broker.logger.Debug("ackloop: return ack to broker loop:", count) - - l.broker.logger.Debug("ackloop: done send ack") - return count } func (l *ackLoop) collectAcked() batchList { ackedBatches := batchList{} - acks := l.pendingBatches.pop() - ackedBatches.append(acks) + // The first batch is always included, since that's what triggered the call + // to collectAcked. + nextBatch := l.pendingBatches.ConsumeFirst() + ackedBatches.Add(nextBatch) done := false - for !l.pendingBatches.empty() && !done { - acks := l.pendingBatches.front() + for !l.pendingBatches.Empty() && !done { + nextBatch = l.pendingBatches.First() select { - case <-acks.doneChan: - ackedBatches.append(l.pendingBatches.pop()) + case <-nextBatch.doneChan: + ackedBatches.Add(nextBatch) + l.pendingBatches.Remove() default: done = true @@ -107,18 +97,24 @@ func (l *ackLoop) collectAcked() batchList { // Called by ackLoop. This function exists to decouple the work of collecting // and running producer callbacks from logical deletion of the events, so // input callbacks can't block the queue by occupying the runLoop goroutine. -func (l *ackLoop) processACK(lst batchList, N int) { +func (l *ackLoop) processACK(lst batchList) { ackCallbacks := []func(){} + batches := []batch{} + for !lst.Empty() { + batches = append(batches, lst.First()) + lst.Remove() + } // First we traverse the entries we're about to remove, collecting any callbacks // we need to run. - lst.reverse() - for !lst.empty() { - batch := lst.pop() + // Traverse entries from last to first, so we can acknowledge the most recent + // ones first and skip repeated producer callbacks. + eventCount := 0 + for batchIndex := len(batches) - 1; batchIndex >= 0; batchIndex-- { + batch := batches[batchIndex] + eventCount += batch.count - // Traverse entries from last to first, so we can acknowledge the most recent - // ones first and skip subsequent producer callbacks. for i := batch.count - 1; i >= 0; i-- { - entry := batch.rawEntry(i) + entry := batch.entry(i) if entry.producer == nil { continue } @@ -136,7 +132,8 @@ func (l *ackLoop) processACK(lst batchList, N int) { } } // Signal runLoop to delete the events - l.broker.deleteChan <- N + l.broker.deleteChan <- eventCount + l.broker.logger.Debug("ackloop: return ack to broker loop:", eventCount) // The events have been removed; notify their listeners. for _, f := range ackCallbacks { diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b617bae6110..800b95fa252 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -19,10 +19,11 @@ package memqueue import ( "context" + "errors" "io" - "sync" "time" + "github.com/elastic/beats/v7/libbeat/common/fifo" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" ) @@ -46,13 +47,6 @@ type broker struct { ctx context.Context ctxCancel context.CancelFunc - // The ring buffer backing the queue. All buffer positions should be taken - // modulo the size of this array. - buf []queueEntry - - // wait group for queue workers (runLoop and ackLoop) - wg sync.WaitGroup - // The factory used to create an event encoder when creating a producer encoderFactory queue.EncoderFactory @@ -88,7 +82,8 @@ type broker struct { /////////////////////////////// // internal goroutine state - // The goroutine that manages the queue's core run state + // The goroutine that manages the queue's core run state and owns its + // backing buffer. runLoop *runLoop // The goroutine that manages ack notifications and callbacks @@ -96,8 +91,10 @@ type broker struct { } type Settings struct { - // The number of events the queue can hold. + // The number of events and bytes the queue can hold. <= zero means no limit. + // At least one must be greater than zero. Events int + Bytes int // The most events that will ever be returned from one Get request. MaxGetRequest int @@ -110,30 +107,31 @@ type Settings struct { type queueEntry struct { event queue.Entry eventSize int - id queue.EntryID producer *ackProducer producerID producerID // The order of this entry within its producer } type batch struct { - queue *broker + // The queue buffer (at the time that this batch was generated -- + // only the indices corresponding to this batch's events are guaranteed + // to be valid). + queueBuf circularBuffer - // Next batch in the containing batchList - next *batch + // Position of the batch's events within the queue. This is an absolute + // index over the lifetime of the queue, to get the position within the + // queue's current circular buffer, use (start % len(queue.buf)). + start entryIndex - // Position and length of the events within the queue buffer - start, count int + // Number of sequential events in this batch. + count int // batch.Done() sends to doneChan, where ackLoop reads it and handles // acknowledgment / cleanup. doneChan chan batchDoneMsg } -type batchList struct { - head *batch - tail *batch -} +type batchList = fifo.FIFO[batch] // FactoryForSettings is a simple wrapper around NewQueue so a concrete // Settings object can be wrapped in a queue-agnostic interface for @@ -145,7 +143,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory { inputQueueSize int, encoderFactory queue.EncoderFactory, ) (queue.Queue, error) { - return NewQueue(logger, observer, settings, inputQueueSize, encoderFactory), nil + return NewQueue(logger, observer, settings, inputQueueSize, encoderFactory) } } @@ -158,21 +156,16 @@ func NewQueue( settings Settings, inputQueueSize int, encoderFactory queue.EncoderFactory, -) *broker { - b := newQueue(logger, observer, settings, inputQueueSize, encoderFactory) - - // Start the queue workers - b.wg.Add(2) - go func() { - defer b.wg.Done() - b.runLoop.run() - }() - go func() { - defer b.wg.Done() - b.ackLoop.run() - }() - - return b +) (*broker, error) { + b, err := newQueue(logger, observer, settings, inputQueueSize, encoderFactory) + + if err == nil { + // Start the queue workers + go b.runLoop.run() + go b.ackLoop.run() + } + + return b, err } // newQueue does most of the work of creating a queue from the given @@ -185,7 +178,7 @@ func newQueue( settings Settings, inputQueueSize int, encoderFactory queue.EncoderFactory, -) *broker { +) (*broker, error) { if observer == nil { observer = queue.NewQueueObserver(nil) } @@ -200,8 +193,12 @@ func newQueue( settings.MaxGetRequest = (settings.Events + 1) / 2 } + if settings.Bytes > 0 && encoderFactory == nil { + return nil, errors.New("queue.mem.bytes is set but the output doesn't support byte-based event buffers") + } + // Can't request more than the full queue - if settings.MaxGetRequest > settings.Events { + if settings.Events > 0 && settings.MaxGetRequest > settings.Events { settings.MaxGetRequest = settings.Events } @@ -213,8 +210,6 @@ func newQueue( settings: settings, logger: logger, - buf: make([]queueEntry, settings.Events), - encoderFactory: encoderFactory, // broker API channels @@ -234,7 +229,7 @@ func newQueue( observer.MaxEvents(settings.Events) - return b + return b, nil } func (b *broker) Close() error { @@ -252,7 +247,7 @@ func (b *broker) QueueType() string { func (b *broker) BufferConfig() queue.BufferConfig { return queue.BufferConfig{ - MaxEvents: len(b.buf), + MaxEvents: b.settings.Events, } } @@ -267,13 +262,13 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { return newProducer(b, cfg.ACK, encoder) } -func (b *broker) Get(count int) (queue.Batch, error) { - responseChan := make(chan *batch, 1) +func (b *broker) Get(count int, bytes int) (queue.Batch, error) { + responseChan := make(chan batch, 1) select { case <-b.ctx.Done(): return nil, io.EOF case b.getChan <- getRequest{ - entryCount: count, responseChan: responseChan}: + entryCount: count, byteCount: bytes, responseChan: responseChan}: } // if request has been sent, we have to wait for a response @@ -281,93 +276,16 @@ func (b *broker) Get(count int) (queue.Batch, error) { return resp, nil } -var batchPool = sync.Pool{ - New: func() interface{} { - return &batch{ - doneChan: make(chan batchDoneMsg, 1), - } - }, +func (b *broker) useByteLimits() bool { + return b.settings.Bytes > 0 } -func newBatch(queue *broker, start, count int) *batch { - batch := batchPool.Get().(*batch) - batch.next = nil - batch.queue = queue - batch.start = start - batch.count = count - return batch -} - -func releaseBatch(b *batch) { - b.next = nil - batchPool.Put(b) -} - -func (l *batchList) prepend(b *batch) { - b.next = l.head - l.head = b - if l.tail == nil { - l.tail = b - } -} - -func (l *batchList) concat(other *batchList) { - if other.head == nil { - return - } - - if l.head == nil { - *l = *other - return - } - - l.tail.next = other.head - l.tail = other.tail -} - -func (l *batchList) append(b *batch) { - if l.head == nil { - l.head = b - } else { - l.tail.next = b - } - l.tail = b -} - -func (l *batchList) empty() bool { - return l.head == nil -} - -func (l *batchList) front() *batch { - return l.head -} - -func (l *batchList) nextBatchChannel() chan batchDoneMsg { - if l.head == nil { - return nil - } - return l.head.doneChan -} - -func (l *batchList) pop() *batch { - ch := l.head - if ch != nil { - l.head = ch.next - if l.head == nil { - l.tail = nil - } - } - - ch.next = nil - return ch -} - -func (l *batchList) reverse() { - tmp := *l - *l = batchList{} - - for !tmp.empty() { - l.prepend(tmp.pop()) +func newBatch(queueBuf circularBuffer, start entryIndex, count int) batch { + return batch{ + doneChan: make(chan batchDoneMsg, 1), + queueBuf: queueBuf, + start: start, + count: count, } } @@ -383,21 +301,21 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) { return actual } -func (b *batch) Count() int { +func (b batch) Count() int { return b.count } // Return a pointer to the queueEntry for the i-th element of this batch -func (b *batch) rawEntry(i int) *queueEntry { - // Indexes wrap around the end of the queue buffer - return &b.queue.buf[(b.start+i)%len(b.queue.buf)] +func (b batch) entry(i int) *queueEntry { + entryIndex := b.start.plus(i) + return b.queueBuf.entry(entryIndex) } // Return the event referenced by the i-th element of this batch -func (b *batch) Entry(i int) queue.Entry { - return b.rawEntry(i).event +func (b batch) Entry(i int) queue.Entry { + return b.entry(i).event } -func (b *batch) Done() { +func (b batch) Done() { b.doneChan <- batchDoneMsg{} } diff --git a/libbeat/publisher/queue/memqueue/circular_buffer.go b/libbeat/publisher/queue/memqueue/circular_buffer.go new file mode 100644 index 00000000000..c66a76e2e69 --- /dev/null +++ b/libbeat/publisher/queue/memqueue/circular_buffer.go @@ -0,0 +1,44 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memqueue + +type circularBuffer struct { + // Do not access this array directly! use (circularBuffer).entry(). + _entries []queueEntry +} + +type entryIndex int + +func newCircularBuffer(size int) circularBuffer { + return circularBuffer{ + _entries: make([]queueEntry, size), + } +} + +func (cb circularBuffer) size() int { + return len(cb._entries) +} + +func (cb circularBuffer) entry(i entryIndex) *queueEntry { + rawIndex := int(i) % len(cb._entries) + return &cb._entries[rawIndex] +} + +func (ei entryIndex) plus(offset int) entryIndex { + return entryIndex(int(ei) + offset) +} diff --git a/libbeat/publisher/queue/memqueue/config.go b/libbeat/publisher/queue/memqueue/config.go index 7d9593b30e3..c282650298b 100644 --- a/libbeat/publisher/queue/memqueue/config.go +++ b/libbeat/publisher/queue/memqueue/config.go @@ -22,32 +22,48 @@ import ( "fmt" "time" + "github.com/elastic/beats/v7/libbeat/common/cfgtype" c "github.com/elastic/elastic-agent-libs/config" ) type config struct { - Events int `config:"events" validate:"min=32"` - // This field is named MaxGetRequest because its logical effect is to give + Events *int `config:"events" validate:"min=32"` + Bytes *cfgtype.ByteSize `config:"bytes"` + + // This field is named MaxGetEvents because its logical effect is to give // a maximum on the number of events a Get request can return, but the // user-exposed name is "flush.min_events" for backwards compatibility, // since it used to control buffer size in the internal buffer chain. - MaxGetRequest int `config:"flush.min_events" validate:"min=0"` - FlushTimeout time.Duration `config:"flush.timeout"` + // Ignored if a byte limit is set in the queue or the get request. + MaxGetEvents int `config:"flush.min_events" validate:"min=0"` + FlushTimeout time.Duration `config:"flush.timeout"` } -var defaultConfig = config{ - Events: 3200, - MaxGetRequest: 1600, - FlushTimeout: 10 * time.Second, -} +const minQueueBytes = 32768 +const minQueueEvents = 32 +const defaultMaxQueueEvents = 3200 func (c *config) Validate() error { - if c.MaxGetRequest > c.Events { - return errors.New("flush.min_events must be less events") + if c.Bytes != nil && *c.Bytes < minQueueBytes { + return fmt.Errorf("queue byte size must be at least %v", minQueueBytes) + } + if c.Events != nil && *c.Events < minQueueEvents { + return fmt.Errorf("queue event size must be at least %v", minQueueEvents) + } + if c.Events != nil && c.Bytes != nil { + return errors.New("memory queue can only have an event limit or a byte limit, not both") + } + if c.Events != nil && c.MaxGetEvents > *c.Events { + return errors.New("flush.min_events must be less than events") } return nil } +var defaultConfig = config{ + MaxGetEvents: 1600, + FlushTimeout: 10 * time.Second, +} + // SettingsForUserConfig unpacks a ucfg config from a Beats queue // configuration and returns the equivalent memqueue.Settings object. func SettingsForUserConfig(cfg *c.C) (Settings, error) { @@ -57,10 +73,20 @@ func SettingsForUserConfig(cfg *c.C) (Settings, error) { return Settings{}, fmt.Errorf("couldn't unpack memory queue config: %w", err) } } - //nolint:gosimple // Actually want this conversion to be explicit since the types aren't definitionally equal. - return Settings{ - Events: config.Events, - MaxGetRequest: config.MaxGetRequest, + result := Settings{ + MaxGetRequest: config.MaxGetEvents, FlushTimeout: config.FlushTimeout, - }, nil + } + + if config.Events != nil { + result.Events = *config.Events + } + if config.Bytes != nil { + result.Bytes = int(*config.Bytes) + } + // If no size constraint was given, fall back on the default event cap + if config.Events == nil && config.Bytes == nil { + result.Events = defaultMaxQueueEvents + } + return result, nil } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 0d983de6520..881d2a826ee 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -28,6 +28,11 @@ type pushRequest struct { // early encoding, 0 otherwise. eventSize int + // If the queue doesn't have room for an incoming event and blockIfFull + // is true, the request will be held until there is space in the queue. + // Otherwise, the queue will return failure immediately. + blockIfFull bool + // The producer that generated this event, or nil if this producer does // not require ack callbacks. producer *ackProducer @@ -35,14 +40,20 @@ type pushRequest struct { // The index of the event in this producer only. Used to condense // multiple acknowledgments for a producer to a single callback call. producerID producerID - resp chan queue.EntryID + resp chan bool } // consumer -> broker API type getRequest struct { - entryCount int // request entryCount events from the broker - responseChan chan *batch // channel to send response to + // The number of entries to request, or <= 0 for no limit. + entryCount int + + // The number of (encoded) event bytes to request, or <= 0 for no limit. + byteCount int + + // The channel to send the new batch to. + responseChan chan batch } type batchDoneMsg struct{} diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 0ecabfe77f0..77b4601604b 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -72,50 +72,55 @@ func newProducer(b *broker, cb ackHandler, encoder queue.Encoder) queue.Producer return &forgetfulProducer{broker: b, openState: openState} } -func (p *forgetfulProducer) makePushRequest(event queue.Entry) pushRequest { - resp := make(chan queue.EntryID, 1) +func (p *forgetfulProducer) makePushRequest( + event queue.Entry, + blockIfFull bool, +) pushRequest { + resp := make(chan bool, 1) return pushRequest{ - event: event, - resp: resp} + event: event, + blockIfFull: blockIfFull, + resp: resp} } -func (p *forgetfulProducer) Publish(event queue.Entry) (queue.EntryID, bool) { - return p.openState.publish(p.makePushRequest(event)) +func (p *forgetfulProducer) Publish(event queue.Entry) bool { + return p.openState.publish(p.makePushRequest(event, true)) } -func (p *forgetfulProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { - return p.openState.tryPublish(p.makePushRequest(event)) +func (p *forgetfulProducer) TryPublish(event queue.Entry) bool { + return p.openState.publish(p.makePushRequest(event, false)) } func (p *forgetfulProducer) Close() { p.openState.Close() } -func (p *ackProducer) makePushRequest(event queue.Entry) pushRequest { - resp := make(chan queue.EntryID, 1) +func (p *ackProducer) makePushRequest(event queue.Entry, blockIfFull bool) pushRequest { + resp := make(chan bool, 1) return pushRequest{ - event: event, - producer: p, + event: event, + blockIfFull: blockIfFull, + producer: p, // We add 1 to the id so the default lastACK of 0 is a // valid initial state and 1 is the first real id. producerID: producerID(p.producedCount + 1), resp: resp} } -func (p *ackProducer) Publish(event queue.Entry) (queue.EntryID, bool) { - id, published := p.openState.publish(p.makePushRequest(event)) +func (p *ackProducer) Publish(event queue.Entry) bool { + published := p.openState.publish(p.makePushRequest(event, true)) if published { p.producedCount++ } - return id, published + return published } -func (p *ackProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { - id, published := p.openState.tryPublish(p.makePushRequest(event)) +func (p *ackProducer) TryPublish(event queue.Entry) bool { + published := p.openState.publish(p.makePushRequest(event, false)) if published { p.producedCount++ } - return id, published + return published } func (p *ackProducer) Close() { @@ -126,7 +131,7 @@ func (st *openState) Close() { close(st.done) } -func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { +func (st *openState) publish(req pushRequest) bool { // If we were given an encoder callback for incoming events, apply it before // sending the entry to the queue. if st.encoder != nil { @@ -140,44 +145,12 @@ func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { // shutdown channel. select { case resp := <-req.resp: - return resp, true + return resp case <-st.queueClosing: - st.events = nil - return 0, false } case <-st.done: - st.events = nil - return 0, false case <-st.queueClosing: - st.events = nil - return 0, false - } -} - -func (st *openState) tryPublish(req pushRequest) (queue.EntryID, bool) { - // If we were given an encoder callback for incoming events, apply it before - // sending the entry to the queue. - if st.encoder != nil { - req.event, req.eventSize = st.encoder.EncodeEntry(req.event) - } - select { - case st.events <- req: - // The events channel is buffered, which means we may successfully - // write to it even if the queue is shutting down. To avoid blocking - // forever during shutdown, we also have to wait on the queue's - // shutdown channel. - select { - case resp := <-req.resp: - return resp, true - case <-st.queueClosing: - st.events = nil - return 0, false - } - case <-st.done: - st.events = nil - return 0, false - default: - st.log.Debugf("Dropping event, queue is blocked") - return 0, false } + st.events = nil + return false } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 9cd209bbd51..5f4716a6d42 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -86,12 +86,13 @@ func TestProduceConsumer(t *testing.T) { // than 2 events to it, p.Publish will block, once we call q.Close, // we ensure the 3rd event was not successfully published. func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { - q := NewQueue(nil, nil, + q, err := NewQueue(nil, nil, Settings{ Events: 2, // Queue size MaxGetRequest: 1, // make sure the queue won't buffer events FlushTimeout: time.Millisecond, }, 0, nil) + require.NoError(t, err, "Queue creation must succeed") p := q.Producer(queue.ProducerConfig{ // We do not read from the queue, so the callbacks are never called @@ -104,14 +105,14 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { // Publish 2 events, this will make the queue full, but // both will be accepted for i := 0; i < 2; i++ { - id, ok := p.Publish(fmt.Sprintf("Event %d", i)) + ok := p.Publish(fmt.Sprintf("Event %d", i)) if !ok { - t.Errorf("failed to publish to the queue, event ID: %v", id) + t.Errorf("failed to publish to the queue") return } publishCount.Add(1) } - _, ok := p.Publish("Event 3") + ok := p.Publish("Event 3") if ok { t.Errorf("publishing the 3rd event must fail") return @@ -156,12 +157,13 @@ func TestProducerClosePreservesEventCount(t *testing.T) { var activeEvents atomic.Int64 - q := NewQueue(nil, nil, + q, err := NewQueue(nil, nil, Settings{ Events: 3, // Queue size MaxGetRequest: 2, FlushTimeout: 10 * time.Millisecond, }, 1, nil) + require.NoError(t, err, "Queue creation must succeed") p := q.Producer(queue.ProducerConfig{ ACK: func(count int) { @@ -184,7 +186,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) { // decrement afterwards if it failed (otherwise the event count // could become negative even under correct queue operation). activeEvents.Add(1) - _, ok := p.Publish(event) + ok := p.Publish(event) if !ok { activeEvents.Add(-1) } @@ -210,7 +212,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) { // Get call will block until the queue itself is cancelled. go func() { for i := 0; i < 2; i++ { - batch, err := q.Get(2) + batch, err := q.Get(2, 0) // Only error to worry about is queue closing, which isn't // a test failure. if err == nil { @@ -229,11 +231,12 @@ func TestProducerClosePreservesEventCount(t *testing.T) { func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { - return NewQueue(nil, nil, Settings{ + q, _ := NewQueue(nil, nil, Settings{ Events: sz, MaxGetRequest: minEvents, FlushTimeout: flushTimeout, }, 0, nil) + return q } } diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 397b41a25e8..7c340064ba6 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -20,6 +20,7 @@ package memqueue import ( "time" + "github.com/elastic/beats/v7/libbeat/common/fifo" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -29,46 +30,61 @@ import ( type runLoop struct { broker *broker + // The buffer backing the queue. Don't access its internal array directly, + // use an entryIndex: buf.entry(entryIndex) returns a pointer to the target + // entry within the buffer. + // Accessing this way handles the modular arithmetic to convert entry index + // to buffer index, in a way that's compatible with dynamically growing the + // underlying array (which is important when the queue has no maximum event + // count). + buf circularBuffer + // observer is a metrics observer used to report internal queue state. observer queue.Observer - // The index of the beginning of the current ring buffer within its backing - // array. If the queue isn't empty, bufPos points to the oldest remaining - // event. - bufPos int + // The index of the oldest entry in the underlying circular buffer. + bufferStart entryIndex - // The total number of events in the queue. + // The current number of events in the queue. eventCount int + // The current number of bytes in the queue. + byteCount int + // The number of consumed events waiting for acknowledgment. The next Get - // request will return events starting at position - // (bufPos + consumedCount) % len(buf). - consumedCount int + // request will return events starting at index + // bufferStart.plus(consumedEventCount). + consumedEventCount int + + // The number of event bytes in the queue corresponding to consumed events. + consumedByteCount int // The list of batches that have been consumed and are waiting to be sent // to ackLoop for acknowledgment handling. (This list doesn't contain all // outstanding batches, only the ones not yet forwarded to ackLoop.) consumedBatches batchList + // pendingPushRequests stores incoming events that can't yet fit in the + // queue. As space in the queue is freed, these requests will be handled + // in order. + pendingPushRequests fifo.FIFO[pushRequest] + // If there aren't enough events ready to fill an incoming get request, - // the queue may block based on its flush settings. When this happens, - // pendingGetRequest stores the request until we're ready to handle it. + // the request may block based on the queue flush settings. When this + // happens, pendingGetRequest stores the request until we can handle it. pendingGetRequest *getRequest - // This timer tracks the configured flush timeout when we will respond - // to a pending getRequest even if we can't fill the requested event count. - // It is active if and only if pendingGetRequest is non-nil. + // When a get request is blocked because the queue doesn't have enough + // events, getTimer stores the flush timer. When it expires, the queue + // will respond to the request even if the requested number of events + // and/or bytes is not available. + // getTimer is active if and only if pendingGetRequest is non-nil. getTimer *time.Timer // closing is set when a close request is received. Once closing is true, // the queue will not accept any new events, but will continue responding // to Gets and Acks to allow pending events to complete on shutdown. closing bool - - // TODO (https://github.com/elastic/beats/issues/37893): entry IDs were a - // workaround for an external project that no longer exists. At this point - // they just complicate the API and should be removed. - nextEntryID queue.EntryID } func newRunLoop(broker *broker, observer queue.Observer) *runLoop { @@ -82,10 +98,19 @@ func newRunLoop(broker *broker, observer queue.Observer) *runLoop { <-timer.C } } + + eventBufSize := broker.settings.Events + if broker.useByteLimits() { + // The queue is using byte limits, start with a buffer of 2^10 and + // we will expand it as needed. + eventBufSize = 1 << 10 + } + return &runLoop{ broker: broker, observer: observer, getTimer: timer, + buf: newCircularBuffer(eventBufSize), } } @@ -99,22 +124,22 @@ func (l *runLoop) run() { // standalone helper function to allow testing of loop invariants. func (l *runLoop) runIteration() { var pushChan chan pushRequest - // Push requests are enabled if the queue isn't full or closing. - if l.eventCount < len(l.broker.buf) && !l.closing { + // Push requests are enabled if the queue isn't closing. + if !l.closing { pushChan = l.broker.pushChan } var getChan chan getRequest // Get requests are enabled if the queue has events that weren't yet sent // to consumers, and no existing request is active. - if l.pendingGetRequest == nil && l.eventCount > l.consumedCount { + if l.pendingGetRequest == nil && l.eventCount > l.consumedEventCount { getChan = l.broker.getChan } var consumedChan chan batchList // Enable sending to the scheduled ACKs channel if we have // something to send. - if !l.consumedBatches.empty() { + if !l.consumedBatches.Empty() { consumedChan = l.broker.consumedChan } @@ -127,16 +152,16 @@ func (l *runLoop) runIteration() { select { case <-l.broker.closeChan: l.closing = true - close(l.broker.closingChan) - // Get requests are handled immediately during shutdown + // Get and push requests are handled immediately during shutdown l.maybeUnblockGetRequest() + l.maybeUnblockPushRequests() case <-l.broker.ctx.Done(): // The queue is fully shut down, do nothing return case req := <-pushChan: // producer pushing new event - l.handleInsert(&req) + l.handlePushRequest(req) case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) @@ -156,11 +181,12 @@ func (l *runLoop) runIteration() { l.pendingGetRequest = nil } } - func (l *runLoop) handleGetRequest(req *getRequest) { - if req.entryCount <= 0 || req.entryCount > l.broker.settings.MaxGetRequest { + // When using event-based limits, requests are capped by settings.MaxGetRequest. + if !l.broker.useByteLimits() && req.entryCount > l.broker.settings.MaxGetRequest { req.entryCount = l.broker.settings.MaxGetRequest } + if l.getRequestShouldBlock(req) { l.pendingGetRequest = req l.getTimer.Reset(l.broker.settings.FlushTimeout) @@ -174,59 +200,180 @@ func (l *runLoop) getRequestShouldBlock(req *getRequest) bool { // Never block if the flush timeout isn't positive, or during shutdown return false } - eventsAvailable := l.eventCount - l.consumedCount - // Block if the available events aren't enough to fill the request - return eventsAvailable < req.entryCount + + // The entry/byte limits are satisfied if they are <= 0 (indicating no + // limit) or if we have at least the requested number available. + if l.broker.useByteLimits() { + availableBytes := l.byteCount - l.consumedByteCount + return req.byteCount > 0 && availableBytes < req.byteCount + } + availableEntries := l.eventCount - l.consumedEventCount + return req.entryCount > 0 && availableEntries < req.entryCount } // Respond to the given get request without blocking or waiting for more events func (l *runLoop) handleGetReply(req *getRequest) { - eventsAvailable := l.eventCount - l.consumedCount - batchSize := req.entryCount - if eventsAvailable < batchSize { - batchSize = eventsAvailable + entriesAvailable := l.eventCount - l.consumedEventCount + // backwards compatibility: when using event-based limits, batch size + // can't be more than settings.MaxGetRequest. + if l.broker.useByteLimits() { + if entriesAvailable > l.broker.settings.MaxGetRequest { + entriesAvailable = l.broker.settings.MaxGetRequest + } + } + startIndex := l.bufferStart.plus(l.consumedEventCount) + batchEntryCount := 0 + batchByteCount := 0 + + for i := 0; i < entriesAvailable; i++ { + if req.entryCount > 0 && batchEntryCount+1 > req.entryCount { + // This would push us over the requested event limit, stop here. + break + } + eventSize := l.buf.entry(startIndex.plus(batchEntryCount)).eventSize + // Don't apply size checks on the first event: if a single event is + // larger than the configured batch maximum, we'll still try to send it, + // we'll just do it in a "batch" of one event. + if i > 0 && req.byteCount > 0 && batchByteCount+eventSize > req.byteCount { + // This would push us over the requested byte limit, stop here. + break + } + batchEntryCount++ + batchByteCount += eventSize } - startIndex := l.bufPos + l.consumedCount - batch := newBatch(l.broker, startIndex, batchSize) + batch := newBatch(l.buf, startIndex, batchEntryCount) batchBytes := 0 - for i := 0; i < batchSize; i++ { - batchBytes += batch.rawEntry(i).eventSize + for i := 0; i < batchEntryCount; i++ { + batchBytes += batch.entry(i).eventSize } // Send the batch to the caller and update internal state req.responseChan <- batch - l.consumedBatches.append(batch) - l.consumedCount += batchSize - l.observer.ConsumeEvents(batchSize, batchBytes) + l.consumedBatches.Add(batch) + l.consumedEventCount += batchEntryCount + l.consumedByteCount += batchByteCount + l.observer.ConsumeEvents(batchEntryCount, batchByteCount) } -func (l *runLoop) handleDelete(count int) { - byteCount := 0 - for i := 0; i < count; i++ { - entry := l.broker.buf[(l.bufPos+i)%len(l.broker.buf)] - byteCount += entry.eventSize - } +func (l *runLoop) handleDelete(deletedEntryCount int) { // Advance position and counters. Event data was already cleared in - // batch.FreeEntries when the events were vended. - l.bufPos = (l.bufPos + count) % len(l.broker.buf) - l.eventCount -= count - l.consumedCount -= count - l.observer.RemoveEvents(count, byteCount) + // batch.FreeEntries when the events were vended, so we just need to + // check the byte total being removed. + deletedByteCount := 0 + for i := 0; i < deletedEntryCount; i++ { + entryIndex := l.bufferStart.plus(i) + deletedByteCount += l.buf.entry(entryIndex).eventSize + } + l.bufferStart = l.bufferStart.plus(deletedEntryCount) + l.eventCount -= deletedEntryCount + l.byteCount -= deletedByteCount + l.consumedEventCount -= deletedEntryCount + l.consumedByteCount -= deletedByteCount + l.observer.RemoveEvents(deletedEntryCount, deletedByteCount) if l.closing && l.eventCount == 0 { // Our last events were acknowledged during shutdown, signal final shutdown l.broker.ctxCancel() } + + // We just freed up space in the queue, see if this unblocked any + // pending inserts. + l.maybeUnblockPushRequests() +} + +func (l *runLoop) handlePushRequest(req pushRequest) { + // If other inserts are already pending, or we don't have enough room + // for the new entry, we need to either reject the request or block + // until we can handle it. + if !l.pendingPushRequests.Empty() || !l.canFitPushRequest(req) { + if req.blockIfFull { + // Add this request to the pending list to be handled when there's space. + l.pendingPushRequests.Add(req) + } else { + // Request doesn't want to block, return failure immediately. + l.broker.logger.Debugf("queue is full, dropping event") + req.resp <- false + } + return + } + // There is space, insert the new event and report the result. + l.doInsert(req) +} + +// Returns true if the given push request can be added to the queue +// without exceeding the entry count or byte limit. +func (l *runLoop) canFitPushRequest(req pushRequest) bool { + if l.broker.useByteLimits() { + newByteCount := l.byteCount + req.eventSize + return newByteCount <= l.broker.settings.Bytes + } + newEventCount := l.eventCount + 1 + return newEventCount <= l.broker.settings.Events } -func (l *runLoop) handleInsert(req *pushRequest) { - l.insert(req, l.nextEntryID) - // Send back the new event id. - req.resp <- l.nextEntryID +func (l *runLoop) maybeUnblockPushRequests() { + for !l.pendingPushRequests.Empty() { + req := l.pendingPushRequests.First() + if l.closing { + // If the queue is closing, reject all pending requests + req.resp <- false + continue + } + if !l.canFitPushRequest(req) { + // We're out of space, the rest of the blocked requests will have + // to wait. + break + } + l.doInsert(req) + l.pendingPushRequests.Remove() + } +} + +// growEventBuffer is called when there is no limit on the queue event +// count (i.e. the queue size is byte-based) but the queue's event buffer +// (a []queueEntry) is full. +// For this to be possible, queue indices must be stable when the buffer +// size changes. Therefore, entry positions are based on a strictly +// increasing id, so that different events have different positions, +// even when they occupy the same location in the underlying buffer. +// The buffer position is the entry's index modulo the buffer size: for +// a queue with buffer size N, the entries stored in buf[0] will have +// entry indices 0, N, 2*N, 3*N, ... +func (l *runLoop) growEventBuffer() { + bufSize := l.buf.size() + newBuffer := newCircularBuffer(bufSize * 2) + // Copy the elements to the new buffer + for i := 0; i < bufSize; i++ { + index := l.bufferStart.plus(i) + *newBuffer.entry(index) = *l.buf.entry(index) + } + l.buf = newBuffer +} + +// Insert the given new event without bounds checks, and report the result +// to the caller via the push request's response channel. +func (l *runLoop) doInsert(req pushRequest) { + // If using byte limits (no hard limit on event count), check if we need to + // grow the current queue buffer to fit the new event. + if l.broker.useByteLimits() && l.eventCount >= l.buf.size() { + l.growEventBuffer() + } + + entryIndex := l.bufferStart.plus(l.eventCount) + *l.buf.entry(entryIndex) = queueEntry{ + event: req.event, + eventSize: req.eventSize, + producer: req.producer, + producerID: req.producerID, + } + l.observer.AddEvent(req.eventSize) + + // Report success to the caller + req.resp <- true - l.nextEntryID++ l.eventCount++ + l.byteCount += req.eventSize // See if this gave us enough for a new batch l.maybeUnblockGetRequest() @@ -246,15 +393,3 @@ func (l *runLoop) maybeUnblockGetRequest() { } } } - -func (l *runLoop) insert(req *pushRequest, id queue.EntryID) { - index := (l.bufPos + l.eventCount) % len(l.broker.buf) - l.broker.buf[index] = queueEntry{ - event: req.event, - eventSize: req.eventSize, - id: id, - producer: req.producer, - producerID: req.producerID, - } - l.observer.AddEvent(req.eventSize) -} diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index f6c83e8fec0..969aca46665 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -38,7 +38,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { // available. This test verifies that Get requests that can be completely // filled do not wait for the flush timer. - broker := newQueue( + broker, err := newQueue( logp.NewLogger("testing"), nil, Settings{ @@ -47,6 +47,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { FlushTimeout: 10 * time.Second, }, 10, nil) + require.NoError(t, err, "Queue creation must succeed") producer := newProducer(broker, nil, nil) rl := broker.runLoop @@ -54,7 +55,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { // Pair each publish call with an iteration of the run loop so we // get a response. go rl.runIteration() - _, ok := producer.Publish(i) + ok := producer.Publish(i) require.True(t, ok, "Queue publish call must succeed") } @@ -65,11 +66,11 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { go func() { // Run the Get asynchronously so the test itself doesn't block if // there's a logical error. - _, _ = broker.Get(100) + _, _ = broker.Get(100, 0) }() rl.runIteration() assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since the request should succeed immediately") - assert.Equal(t, 100, rl.consumedCount, "Queue should have a consumedCount of 100 after a consumer requested all its events") + assert.Equal(t, 100, rl.consumedEventCount, "Queue should have a consumedCount of 100 after a consumer requested all its events") } func TestFlushSettingsBlockPartialBatches(t *testing.T) { @@ -77,7 +78,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { // there are enough events. This one uses the same setup to confirm that // Get requests are delayed if there aren't enough events. - broker := newQueue( + broker, err := newQueue( logp.NewLogger("testing"), nil, Settings{ @@ -86,6 +87,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { FlushTimeout: 10 * time.Second, }, 10, nil) + require.NoError(t, err, "Queue creation must succeed") producer := newProducer(broker, nil, nil) rl := broker.runLoop @@ -93,7 +95,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { // Pair each publish call with an iteration of the run loop so we // get a response. go rl.runIteration() - _, ok := producer.Publish("some event") + ok := producer.Publish("some event") require.True(t, ok, "Queue publish call must succeed") } @@ -102,19 +104,19 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { go func() { // Run the Get asynchronously so the test itself doesn't block if // there's a logical error. - _, _ = broker.Get(101) + _, _ = broker.Get(101, 0) }() rl.runIteration() assert.NotNil(t, rl.pendingGetRequest, "Queue should have a pending get request since the queue doesn't have the requested event count") - assert.Equal(t, 0, rl.consumedCount, "Queue should have a consumedCount of 0 since the Get request couldn't be completely filled") + assert.Equal(t, 0, rl.consumedEventCount, "Queue should have a consumedCount of 0 since the Get request couldn't be completely filled") // Now confirm that adding one more event unblocks the request go func() { - _, _ = producer.Publish("some event") + _ = producer.Publish("some event") }() rl.runIteration() assert.Nil(t, rl.pendingGetRequest, "Queue should have no pending get request since adding an event should unblock the previous one") - assert.Equal(t, 101, rl.consumedCount, "Queue should have a consumedCount of 101 after adding an event unblocked the pending get request") + assert.Equal(t, 101, rl.consumedEventCount, "Queue should have a consumedCount of 101 after adding an event unblocked the pending get request") } func TestObserverAddEvent(t *testing.T) { @@ -123,15 +125,15 @@ func TestObserverAddEvent(t *testing.T) { reg := monitoring.NewRegistry() rl := &runLoop{ observer: queue.NewQueueObserver(reg), - broker: &broker{ - buf: make([]queueEntry, 100), - }, + buf: newCircularBuffer(100), + broker: &broker{}, } - request := &pushRequest{ + request := pushRequest{ event: publisher.Event{}, eventSize: 123, + resp: make(chan bool, 1), } - rl.insert(request, 0) + rl.doInsert(request) assertRegistryUint(t, reg, "queue.added.events", 1, "Queue insert should report added event") assertRegistryUint(t, reg, "queue.added.bytes", 123, "Queue insert should report added bytes") } @@ -139,21 +141,22 @@ func TestObserverAddEvent(t *testing.T) { func TestObserverConsumeEvents(t *testing.T) { // Confirm that event batches sent to the output are reported in // queue.consumed.events and queue.consumed.bytes. + const bufSize = 100 reg := monitoring.NewRegistry() rl := &runLoop{ - observer: queue.NewQueueObserver(reg), - broker: &broker{ - buf: make([]queueEntry, 100), - }, + observer: queue.NewQueueObserver(reg), + buf: newCircularBuffer(bufSize), eventCount: 50, + broker: &broker{}, } // Initialize the queue entries to a test byte size - for i := range rl.broker.buf { - rl.broker.buf[i].eventSize = 123 + for i := 0; i < bufSize; i++ { + index := entryIndex(i) + rl.buf.entry(index).eventSize = 123 } request := &getRequest{ - entryCount: len(rl.broker.buf), - responseChan: make(chan *batch, 1), + entryCount: rl.buf.size(), + responseChan: make(chan batch, 1), } rl.handleGetReply(request) // We should have gotten back 50 events, everything in the queue, so we expect the size @@ -164,18 +167,20 @@ func TestObserverConsumeEvents(t *testing.T) { func TestObserverRemoveEvents(t *testing.T) { reg := monitoring.NewRegistry() + const bufSize = 100 rl := &runLoop{ observer: queue.NewQueueObserver(reg), + buf: newCircularBuffer(bufSize), broker: &broker{ ctx: context.Background(), - buf: make([]queueEntry, 100), deleteChan: make(chan int, 1), }, eventCount: 50, } // Initialize the queue entries to a test byte size - for i := range rl.broker.buf { - rl.broker.buf[i].eventSize = 123 + for i := 0; i < bufSize; i++ { + index := entryIndex(i) + rl.buf.entry(index).eventSize = 123 } const deleteCount = 25 rl.broker.deleteChan <- deleteCount diff --git a/libbeat/publisher/queue/monitoring.go b/libbeat/publisher/queue/monitoring.go index 5061d3f5600..d0e911ec976 100644 --- a/libbeat/publisher/queue/monitoring.go +++ b/libbeat/publisher/queue/monitoring.go @@ -63,7 +63,7 @@ type queueObserver struct { type nilObserver struct{} -// Creates queue metrics in the given registry under the path "pipeline.queue". +// Creates queue metrics in the given registry under the path "queue". func NewQueueObserver(metrics *monitoring.Registry) Observer { if metrics == nil { return nilObserver{} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 075d7ad66a4..d4cfe689317 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -50,9 +50,10 @@ type Queue interface { Producer(cfg ProducerConfig) Producer - // Get retrieves a batch of up to eventCount events. If eventCount <= 0, - // there is no bound on the number of returned events. - Get(eventCount int) (Batch, error) + // Get retrieves an event batch with up to eventCount events or up to + // byteCount bytes, whichever is limit matches the queue type. If the + // parameter is 0, there is no limit. + Get(eventCount int, byteCount int) (Batch, error) } // If encoderFactory is provided, then the resulting queue must use it to @@ -83,21 +84,16 @@ type ProducerConfig struct { ACK func(count int) } -type EntryID uint64 - // Producer is an interface to be used by the pipelines client to forward // events to a queue. type Producer interface { - // Publish adds an entry to the queue, blocking if necessary, and returns - // the new entry's id and true on success. - Publish(entry Entry) (EntryID, bool) - - // TryPublish adds an entry to the queue if doing so will not block the - // caller, otherwise it immediately returns. The reasons a publish attempt - // might block are defined by the specific queue implementation and its - // configuration. If the event was successfully added, returns true with - // the event's assigned ID, and false otherwise. - TryPublish(entry Entry) (EntryID, bool) + // Publish adds an entry to the queue, blocking until there is space + // if necessary, and returns true on success. + Publish(entry Entry) bool + + // TryPublish adds an entry to the queue if the queue has space for it, + // otherwise it returns false immediately. + TryPublish(entry Entry) bool // Close closes this Producer endpoint. // Note: A queue may still send ACK signals even after Close is called on diff --git a/libbeat/publisher/queue/queuetest/queuetest.go b/libbeat/publisher/queue/queuetest/queuetest.go index 96b2310d222..ee60cb4b10c 100644 --- a/libbeat/publisher/queue/queuetest/queuetest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -297,7 +297,7 @@ func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { go func() { for { - batch, err := b.Get(batchSize) + batch, err := b.Get(batchSize, 0) if err != nil { return }