Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add byte-based ingestion limits to the queue and output #39776

Draft
wants to merge 121 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
a4019ec
cleanup
faec Apr 10, 2024
a3d3757
cleanups
faec Apr 11, 2024
597e0a5
break input sources up into separate helper functions
faec Apr 11, 2024
0df748a
finish helper function split
faec Apr 11, 2024
4b70900
rewrite the sqsReader main loop
faec Apr 11, 2024
90d9e24
simplify sqsReader loop
faec Apr 12, 2024
5f94e9b
adjust variable names
faec Apr 12, 2024
b797261
remove unused parameter
faec Apr 12, 2024
88f3980
createS3Lister -> createS3Poller
faec Apr 12, 2024
9f32df6
remove unused error checks
faec Apr 12, 2024
48ec82a
cleanup
faec Apr 12, 2024
58e084a
make a wrapper for v2.Canceler that doesn't use an extra goroutine
faec Apr 12, 2024
1974f8f
remove unused parameter
faec Apr 12, 2024
646374c
cleanup
faec Apr 12, 2024
a43cae6
remove redundant helper
faec Apr 12, 2024
f46ef06
adjust variable names
faec Apr 12, 2024
d9be04b
remove extra index indirection in state lookup
faec Apr 13, 2024
5e1fbcc
remove redundant sync.Map
faec Apr 13, 2024
c16a22f
merge redundant state maps
faec Apr 13, 2024
f07915a
remove redundant state map
faec Apr 13, 2024
0f483a3
simplify s3Poller worker handling
faec Apr 13, 2024
8916d91
Merge branch 'main' of github.com:elastic/beats into awss3-cleanup
faec Apr 13, 2024
a8cb6bd
simplify waitgroup handling / unused errors
faec Apr 13, 2024
78a7db4
clean up context handling
faec Apr 13, 2024
edc1bd3
adjust delay timer
faec Apr 13, 2024
1497be4
remove unused struct fields
faec Apr 13, 2024
a3e0dc8
cleanup
faec Apr 13, 2024
219e857
Refactor cloudwatch worker task allocation
faec Apr 15, 2024
977a0d3
add unit tests for cloudwatchPoller.receive
faec Apr 15, 2024
71134f9
Merge branch 'main' of github.com:elastic/beats into cloudwatch-fix
faec Apr 15, 2024
6cf5506
update changelog
faec Apr 15, 2024
ff24571
make check
faec Apr 15, 2024
5ec8a86
Merge branch 'cloudwatch-fix' into awss3-cleanup
faec Apr 15, 2024
2a6abb8
Remove unused custom semaphore helper
faec Apr 15, 2024
12a2a3c
cleanups in input.go
faec Apr 15, 2024
dd29fa0
revert unintentional return value change
faec Apr 15, 2024
4956db9
Concurrency / error handling fixes in awss3
faec Apr 22, 2024
fc641e1
give the registry accessor its own mutex
faec Apr 22, 2024
4a9cb60
update tests
faec Apr 23, 2024
959d557
Merge branch 'main' of github.com:elastic/beats into s3-concurrency-fix
faec Apr 23, 2024
3d93d22
make check
faec Apr 23, 2024
7d6369f
lint
faec Apr 23, 2024
b4b5b28
lint
faec Apr 23, 2024
45619e3
Merge branch 'main' of github.com:elastic/beats into s3-concurrency-fix
faec Apr 24, 2024
e88be00
Merge branch 's3-concurrency-fix' of github.com:faec/beats into awss3…
faec Apr 24, 2024
1308a2d
Merge branch 'main' into s3-concurrency-fix
faec Apr 24, 2024
0abf663
Merge branch 's3-concurrency-fix' into awss3-cleanup
faec Apr 24, 2024
942ae03
cleaning up context use
faec Apr 26, 2024
e84471b
Merge branch 'main' into s3input-cleanup
faec Apr 26, 2024
0289604
Merge branch 's3input-cleanup' into awss3-cleanup
faec Apr 26, 2024
2c084bb
splitting S3 and SQS into distinct inputs internally
faec Apr 26, 2024
dbe4691
splitting awss3 into two input objects
faec Apr 29, 2024
73d1465
Merge branch 'main' of github.com:elastic/beats into s3input-cleanup
faec Apr 30, 2024
ad7d342
Merge branch 's3input-cleanup' into awss3-cleanup
faec Apr 30, 2024
e05c45d
reorganize {s3,sqs}.go by adding {s3,sqs}_input.go for the code speci…
faec Apr 30, 2024
54f0a87
clean up sqs helpers
faec Apr 30, 2024
122ee8c
Merge branch 'main' into awss3-cleanup
faec Apr 30, 2024
d396457
fix merge
faec Apr 30, 2024
be54ac7
update tests
faec Apr 30, 2024
568d2b0
merge sqsReaderInput and sqsReader
faec Apr 30, 2024
383c111
get tests building again
faec May 1, 2024
4b2ea11
remove redundant fields
faec May 1, 2024
de36816
more reorganization
faec May 1, 2024
3117334
organizing
faec May 1, 2024
6b43ac5
reordering code
faec May 1, 2024
d38af54
Merge branch 'main' of github.com:elastic/beats into awss3-cleanup
faec May 1, 2024
3712af4
clean up states initialization
faec May 1, 2024
894ba4c
remove unused helper
faec May 1, 2024
31f3b95
working on test updates
faec May 2, 2024
7d12f0a
fix benchmark tests
faec May 2, 2024
f1b7761
updating unit tests
faec May 3, 2024
fa22239
fix remaining tests
faec May 3, 2024
e253681
remove unused debug parameter
faec May 3, 2024
5b922df
Merge branch 'main' of github.com:elastic/beats into awss3-cleanup
faec May 3, 2024
1fef199
remove commented code
faec May 3, 2024
800fc73
move helper function
faec May 3, 2024
1694f0d
clean up aws client config modifiers
faec May 3, 2024
63be523
reorder helper functions
faec May 3, 2024
1bae757
reorder helper functions
faec May 3, 2024
939c38f
update comments
faec May 3, 2024
b032106
move log creation earlier
faec May 3, 2024
0995921
update comments
faec May 3, 2024
5d9f731
make check
faec May 3, 2024
a8323a4
Working on queue byte limits
faec May 3, 2024
8ab68ed
Fill out more of the byte limits API, remove EntryID
faec May 3, 2024
60e1c2c
convert circular buffer indices to a helper type storing the absolute…
faec May 4, 2024
812f87d
Finish most byte bounds logic, add bulk_max_bytes to ES output
faec May 4, 2024
3fcb875
remove drop on cancel option
faec May 29, 2024
47c07a6
producer.Cancel -> producer.Close
faec May 29, 2024
f9d4c39
remove OnDrop callbacks
faec May 29, 2024
d398b46
remove internal cancellation helpers
faec May 29, 2024
bad2498
remove the queue's shipper metrics hook
faec May 29, 2024
eca723b
remove unused fields and producer cancel tests
faec May 29, 2024
c79cc3e
Merge branch 'remove-producer-cancel' into queue-metrics
faec May 29, 2024
bffd70d
fix merge
faec May 29, 2024
7409be1
moving metric ownership around
faec May 29, 2024
75ac0f4
plumbing for queue metrics
faec May 29, 2024
ca949b8
flesh out queue observer internals
faec May 29, 2024
a279862
update queue filled percent
faec May 29, 2024
d0f1939
Merge branch 'main' into queue-metrics
faec May 30, 2024
ee2fa2b
Merge branch 'main' into memqueue-byte-limits
faec May 30, 2024
f512643
Merge branch 'main' into remove-producer-cancel
faec May 30, 2024
6d585bf
Merge branch 'remove-producer-cancel' into memqueue-byte-limits
faec May 30, 2024
517ffe1
clean up shipper metric hooks
faec May 30, 2024
4f6d02c
use the metrics observer from the memqueue
faec May 30, 2024
fd18f4e
configure gauges
faec May 30, 2024
2f6ba9b
report queue metrics from the disk queue
faec May 30, 2024
ce2c287
fix disk queue initialization
faec May 30, 2024
e70c13b
outputObserver -> retryObserver
faec May 30, 2024
e6dbb2d
move queue draining logic into the queue
faec May 30, 2024
afa3793
shadow acked var the simple way
faec May 30, 2024
f8157ac
Merge branch 'queue-metrics' into memqueue-byte-limits
faec May 30, 2024
5a158ec
memqueue uses event or byte limits, not both
faec May 30, 2024
24c5564
fix byte vs event logic
faec May 30, 2024
3da6d32
clean up FIFO handling
faec May 30, 2024
ac94a2b
replace batchList implementation with FIFO helper
faec May 30, 2024
d1ba6a1
remove unrelated test change
faec May 30, 2024
91e4534
remove unused error
faec May 30, 2024
5aa42eb
Merge branch 'main' of github.com:elastic/beats into queue-byte-limits
faec Jun 18, 2024
b585727
fix merge + tests
faec Jun 18, 2024
3643a8f
Add docs / parameter checks
faec Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions libbeat/common/fifo/fifo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fifo
cmacknz marked this conversation as resolved.
Show resolved Hide resolved

type FIFO[T any] struct {
first *node[T]
last *node[T]
}

type node[T any] struct {
next *node[T]
value T
}

func (f *FIFO[T]) Add(value T) {
newNode := &node[T]{value: value}
if f.first == nil {
f.first = newNode
} else {
f.last.next = newNode
}
f.last = newNode
}

func (f *FIFO[T]) Empty() bool {
return f.first == nil
}

// Return the first value (if present) without removing it from the queue.
// Returns a default value if the queue is empty. To recognize this case,
// check (*FIFO).Empty().
func (f *FIFO[T]) First() T {
if f.first == nil {
var none T
return none
}
return f.first.value
}

// Remove the first entry in this FIFO and return it.
func (f *FIFO[T]) ConsumeFirst() T {
result := f.First()
f.Remove()
return result
}

// Append another FIFO queue to an existing one. Takes ownership of
// the given FIFO's contents.
func (f *FIFO[T]) Concat(list FIFO[T]) {
if list.Empty() {
return
}
if f.Empty() {
*f = list
return
}
f.last.next = list.first
f.last = list.last
}

// Remove the first entry in the queue. Does nothing if the FIFO is empty.
func (f *FIFO[T]) Remove() {
if f.first != nil {
f.first = f.first.next
if f.first == nil {
f.last = nil
}
}
}
23 changes: 23 additions & 0 deletions libbeat/docs/queueconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ queue.mem:
flush.timeout: 5s
------------------------------------------------------------------------------

Here is an alternate configuration that measures queue size in bytes rather
than event count. In this case, the output must set `bulk_max_bytes`
instead of `bulk_max_size` to control the batch size:

[source,yaml]
------------------------------------------------------------------------------
queue.mem:
bytes: 32MB
flush.timeout: 10s
------------------------------------------------------------------------------

[float]
=== Configuration options

Expand All @@ -80,6 +91,16 @@ Number of events the queue can store.

The default value is 3200 events.

[float]
[[queue-mem-bytes-option]]
===== `bytes`

Number of bytes the queue can store. This option is only available for outputs
that support byte-based event buffers (currently just the Elasticsearch output).
The queue should set either `events` or `bytes` but not both.

The default is 0, indicating the queue should use the `events` limit instead.

[float]
[[queue-mem-flush-min-events-option]]
===== `flush.min_events`
Expand All @@ -92,6 +113,8 @@ publishing.
If 0 or 1, sets the maximum number of events per batch to half the queue size, and sets
the queue to synchronous mode (equivalent to `flush.timeout` of 0).

This value is ignored when `bytes` is set.

The default value is 1600.

[float]
Expand Down
6 changes: 3 additions & 3 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
return nil, errors.New("empty hosts list")
}

var clients []outputs.NetworkClient

Check failure on line 128 in libbeat/monitoring/report/elasticsearch/elasticsearch.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Consider pre-allocating `clients` (prealloc)
for _, host := range hosts {
client, err := makeClient(host, params, &config, beat.Beat)
if err != nil {
Expand Down Expand Up @@ -165,9 +165,9 @@
},
queueConfig,
outputs.Group{
Clients: []outputs.Client{outClient},
BatchSize: windowSize,
Retry: 0, // no retry. Drop event on error.
Clients: []outputs.Client{outClient},
BatchEvents: windowSize,
Retry: 0, // no retry. Drop event on error.
},
pipeline.Settings{
WaitClose: 0,
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func makeConsole(
}
}

return outputs.Success(config.Queue, config.BatchSize, 0, nil, c)
return outputs.Success(config.Queue, config.BatchSize, 0, 0, nil, c)
}

func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/discard/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func makeDiscard(
// disable bulk support in publisher pipeline
_ = cfg.SetInt("bulk_max_size", -1, -1)
out.log.Infof("Initialized discard output")
return outputs.Success(doConfig.Queue, -1, 0, nil, out)
return outputs.Success(doConfig.Queue, -1, 0, 0, nil, out)
}

// Implement Outputer
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/cfgtype"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
Expand All @@ -39,6 +40,7 @@ type elasticsearchConfig struct {
EscapeHTML bool `config:"escape_html"`
Kerberos *kerberos.Config `config:"kerberos"`
BulkMaxSize int `config:"bulk_max_size"`
BulkMaxBytes cfgtype.ByteSize `config:"bulk_max_bytes"`
MaxRetries int `config:"max_retries"`
Backoff Backoff `config:"backoff"`
NonIndexablePolicy *config.Namespace `config:"non_indexable_policy"`
Expand Down
9 changes: 8 additions & 1 deletion libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,14 @@ func makeES(
clients[i] = client
}

return outputs.SuccessNet(esConfig.Queue, esConfig.LoadBalance, esConfig.BulkMaxSize, esConfig.MaxRetries, encoderFactory, clients)
return outputs.SuccessNet(
esConfig.Queue,
esConfig.LoadBalance,
esConfig.BulkMaxSize,
int(esConfig.BulkMaxBytes),
esConfig.MaxRetries,
encoderFactory,
clients)
}

func buildSelectors(
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func makeFileout(
return outputs.Fail(err)
}

return outputs.Success(foConfig.Queue, -1, 0, nil, fo)
return outputs.Success(foConfig.Queue, -1, 0, 0, nil, fo)
}

func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func makeKafka(
if kConfig.MaxRetries < 0 {
retry = -1
}
return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, retry, nil, client)
return outputs.Success(kConfig.Queue, kConfig.BulkMaxSize, 0, retry, nil, client)
}

// buildTopicSelector builds the topic selector for standalone Beat and when
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ func makeLogstash(
clients[i] = client
}

return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, lsConfig.MaxRetries, nil, clients)
return outputs.SuccessNet(lsConfig.Queue, lsConfig.LoadBalance, lsConfig.BulkMaxSize, 0, lsConfig.MaxRetries, nil, clients)
}
3 changes: 2 additions & 1 deletion libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type IndexSelector interface {
// configuration into the outputs.
type Group struct {
Clients []Client
BatchSize int
BatchEvents int
BatchBytes int
Retry int
QueueFactory queue.QueueFactory

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func makeRedis(
clients[i] = newBackoffClient(client, rConfig.Backoff.Init, rConfig.Backoff.Max)
}

return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, rConfig.MaxRetries, nil, clients)
return outputs.SuccessNet(rConfig.Queue, rConfig.LoadBalance, rConfig.BulkMaxSize, 0, rConfig.MaxRetries, nil, clients)
}

func buildKeySelector(cfg *config.C) (outil.Selector, error) {
Expand Down
11 changes: 6 additions & 5 deletions libbeat/outputs/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Fail(err error) (Group, error) { return Group{}, err }
// instances. The first argument is expected to contain a queue
// config.Namespace. The queue config is passed to assign the queue
// factory when elastic-agent reloads the output.
func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) {
func Success(cfg config.Namespace, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) {
var q queue.QueueFactory
if cfg.IsSet() && cfg.Config().Enabled() {
switch cfg.Name() {
Expand All @@ -60,7 +60,8 @@ func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.En
}
return Group{
Clients: clients,
BatchSize: batchSize,
BatchEvents: batchEvents,
BatchBytes: batchBytes,
Retry: retry,
QueueFactory: q,
EncoderFactory: encoderFactory,
Expand All @@ -80,12 +81,12 @@ func NetworkClients(netclients []NetworkClient) []Client {
// The first argument is expected to contain a queue config.Namespace.
// The queue config is passed to assign the queue factory when
// elastic-agent reloads the output.
func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) {
func SuccessNet(cfg config.Namespace, loadbalance bool, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) {
Copy link
Member

@cmacknz cmacknz Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth defining a wrapper struct for batchEvents and batchBytes and passing through a copy of that so that nobody can ever accidentally reverse them in the argument list anywhere they are passed together like this?


if !loadbalance {
return Success(cfg, batchSize, retry, encoderFactory, NewFailoverClient(netclients))
return Success(cfg, batchEvents, batchBytes, retry, encoderFactory, NewFailoverClient(netclients))
}

clients := NetworkClients(netclients)
return Success(cfg, batchSize, retry, encoderFactory, clients...)
return Success(cfg, batchEvents, batchBytes, retry, encoderFactory, clients...)
}
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once

Check failure on line 45 in libbeat/publisher/pipeline/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `closeOnce` is unused (unused)

observer observer
eventListener beat.EventListener
Expand Down Expand Up @@ -118,9 +118,9 @@

var published bool
if c.canDrop {
_, published = c.producer.TryPublish(pubEvent)
published = c.producer.TryPublish(pubEvent)
} else {
_, published = c.producer.Publish(pubEvent)
published = c.producer.Publish(pubEvent)
}

if published {
Expand Down
12 changes: 7 additions & 5 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ func TestClient(t *testing.T) {
l := logp.L()

// a small in-memory queue with a very short flush interval
q := memqueue.NewQueue(l, nil, memqueue.Settings{
q, err := memqueue.NewQueue(l, nil, memqueue.Settings{
Events: 5,
MaxGetRequest: 1,
FlushTimeout: time.Millisecond,
}, 5, nil)
require.NoError(t, err, "Queue creation must succeed")

// model a processor that we're going to make produce errors after
p := &testProcessor{}
Expand All @@ -114,7 +115,7 @@ func TestClient(t *testing.T) {
done := make(chan struct{})
go func() {
for {
batch, err := q.Get(2)
batch, err := q.Get(2, 0)
if errors.Is(err, io.EOF) {
break
}
Expand Down Expand Up @@ -201,7 +202,8 @@ func TestClientWaitClose(t *testing.T) {
}
logp.TestingSetup()

q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil)
q, err := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil)
require.NoError(t, err, "Queue creation must succeed")
pipeline := makePipeline(Settings{}, q)
defer pipeline.Close()

Expand Down Expand Up @@ -306,8 +308,8 @@ func TestMonitoring(t *testing.T) {
})
}
return "output_name", outputs.Group{
BatchSize: batchSize,
Clients: clients,
BatchEvents: batchSize,
Clients: clients,
}, nil
},
)
Expand Down
12 changes: 7 additions & 5 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ type eventConsumer struct {
// consumerTarget specifies the queue to read from, the parameters needed
// to generate a batch, and the output channel to send batches to.
type consumerTarget struct {
queue queue.Queue
ch chan publisher.Batch
timeToLive int
batchSize int
queue queue.Queue
ch chan publisher.Batch
timeToLive int
batchEvents int
batchBytes int
}

// retryRequest is used by ttlBatch to add itself back to the eventConsumer
Expand Down Expand Up @@ -134,7 +135,8 @@ outerLoop:
c.queueReader.req <- queueReaderRequest{
queue: target.queue,
retryer: c,
batchSize: target.batchSize,
eventCount: target.batchEvents,
byteCount: target.batchBytes,
timeToLive: target.timeToLive,
}
}
Expand Down
21 changes: 12 additions & 9 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@
// Resume consumer targeting the new work queue
c.consumer.setTarget(
consumerTarget{
queue: c.queue,
ch: targetChan,
batchSize: outGrp.BatchSize,
timeToLive: outGrp.Retry + 1,
queue: c.queue,
ch: targetChan,
batchEvents: outGrp.BatchEvents,
batchBytes: outGrp.BatchBytes,
timeToLive: outGrp.Retry + 1,
})
}

Expand Down Expand Up @@ -276,7 +277,7 @@
if c.monitors.Metrics != nil {
pipelineMetrics := c.monitors.Metrics.GetRegistry("pipeline")
if pipelineMetrics == nil {
pipelineMetrics = c.monitors.Metrics.NewRegistry("pipeline")

Check failure on line 280 in libbeat/publisher/pipeline/controller.go

View workflow job for this annotation

GitHub Actions / lint (windows)

ineffectual assignment to pipelineMetrics (ineffassign)
}
}
queueObserver := queue.NewQueueObserver(pipelineMetrics)
Expand All @@ -285,7 +286,9 @@
if err != nil {
logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration")
s, _ := memqueue.SettingsForUserConfig(nil)
queue = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory)
// Memqueue creation can only fail when it's configured for byte-based limits,
// so we don't need to handle the fallback error.
queue, _ = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory)
}
c.queue = queue

Expand All @@ -307,12 +310,12 @@
// a producer for a nonexistent queue.
type emptyProducer struct{}

func (emptyProducer) Publish(_ queue.Entry) (queue.EntryID, bool) {
return 0, false
func (emptyProducer) Publish(_ queue.Entry) bool {
return false
}

func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) {
return 0, false
func (emptyProducer) TryPublish(_ queue.Entry) bool {
return false
}

func (emptyProducer) Close() {
Expand Down
Loading
Loading