From f7db4b851a3579283479e5191702a9a05f119923 Mon Sep 17 00:00:00 2001 From: flexitrev Date: Thu, 30 Jan 2025 17:11:46 -0500 Subject: [PATCH 1/6] Added MaxOpenRequests to kafka output config --- libbeat/outputs/kafka/config.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 98c7ba06533..95a13c176c2 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -68,6 +68,7 @@ type kafkaConfig struct { BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` Headers []header `config:"headers"` + MaxOpenRequests int `config:"max_open_requests" validate:"min=1"` Backoff backoffConfig `config:"backoff"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` @@ -137,6 +138,7 @@ func defaultConfig() kafkaConfig { CompressionLevel: 4, Version: kafka.Version("2.1.0"), MaxRetries: 3, + MaxOpenRequests: 5, Headers: nil, Backoff: backoffConfig{ Init: 1 * time.Second, @@ -211,6 +213,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err k.Net.ReadTimeout = timeout k.Net.WriteTimeout = timeout k.Net.KeepAlive = config.KeepAlive + k.Net.MaxOpenRequests = config.MaxOpenRequests k.Producer.Timeout = config.BrokerTimeout k.Producer.CompressionLevel = config.CompressionLevel From 4b046c1182f9dc80dcc531cc9365c12aca66f505 Mon Sep 17 00:00:00 2001 From: flexitrev Date: Fri, 31 Jan 2025 14:40:58 -0500 Subject: [PATCH 2/6] Added test, changelog entry --- CHANGELOG.next.asciidoc | 2 +- libbeat/outputs/kafka/config_test.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 91e75266659..75de158238d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -25,7 +25,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Filebeat* - +- Added MaxOpenRequests to kafka output config {pull}42515[42515] - Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] - Removed deprecated Squid from Beats. See <> for migration options. {pull}38037[38037] - Removed deprecated Sonicwall from Beats. Use the https://docs.elastic.co/integrations/sonicwall[SonicWall Firewall] Elastic integration instead. {pull}38037[38037] diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 6d87a5ddc1a..63b9c1f3170 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -65,6 +65,10 @@ func TestConfigAcceptValid(t *testing.T) { "realm": "ELASTIC", }, }, + "set max requests": mapstr.M{ + "topic": "tomato", + "max_open_requests": 50, + }, } for name, test := range tests { From a5cad08477892ad2b3237b3e5b04e3bec9e7d3da Mon Sep 17 00:00:00 2001 From: flexitrev Date: Fri, 31 Jan 2025 14:49:37 -0500 Subject: [PATCH 3/6] updated readme --- libbeat/outputs/kafka/docs/kafka.asciidoc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/libbeat/outputs/kafka/docs/kafka.asciidoc b/libbeat/outputs/kafka/docs/kafka.asciidoc index a487eb5e698..944c7e9af26 100644 --- a/libbeat/outputs/kafka/docs/kafka.asciidoc +++ b/libbeat/outputs/kafka/docs/kafka.asciidoc @@ -260,6 +260,15 @@ Set `max_retries` to a value less than 0 to retry until all events are published The default is 3. endif::[] +===== `max_open_requests` + +The maximum number of unacknowledged requests the client will send on a single connection before blocking. +Throughput can improve but message ordering is not guaranteed if Idempotent is disabled, see: +https://kafka.apache.org/protocol#protocol_network +https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection + +The default is 5. + ===== `backoff.init` The number of seconds to wait before trying to republish to Kafka From f62f3f644b665c2a752d76925cc5caee1d98f05f Mon Sep 17 00:00:00 2001 From: Trevor Blackford <120474420+flexitrev@users.noreply.github.com> Date: Sun, 2 Feb 2025 09:57:12 -0500 Subject: [PATCH 4/6] Update CHANGELOG.next.asciidoc Co-authored-by: William Easton --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1a6284a851c..acabc6f683f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -25,7 +25,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Filebeat* -- Added MaxOpenRequests to kafka output config {pull}42515[42515] +- Added max_open_requests to kafka output config {pull}42515[42515] - Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] - Removed deprecated Squid from Beats. See <> for migration options. {pull}38037[38037] - Removed deprecated Sonicwall from Beats. Use the https://docs.elastic.co/integrations/sonicwall[SonicWall Firewall] Elastic integration instead. {pull}38037[38037] From 1cecd05d37f967211a04e45468e2165e469a40ae Mon Sep 17 00:00:00 2001 From: flexitrev Date: Sun, 2 Feb 2025 09:58:44 -0500 Subject: [PATCH 5/6] removed idempotent ref in readme --- libbeat/outputs/kafka/docs/kafka.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/docs/kafka.asciidoc b/libbeat/outputs/kafka/docs/kafka.asciidoc index 944c7e9af26..803c2f668f2 100644 --- a/libbeat/outputs/kafka/docs/kafka.asciidoc +++ b/libbeat/outputs/kafka/docs/kafka.asciidoc @@ -263,7 +263,7 @@ endif::[] ===== `max_open_requests` The maximum number of unacknowledged requests the client will send on a single connection before blocking. -Throughput can improve but message ordering is not guaranteed if Idempotent is disabled, see: +This can improve throughput, but message ordering is not guaranteed. https://kafka.apache.org/protocol#protocol_network https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection From 0dbecba891810c8402934b6176294fcc222f3ee3 Mon Sep 17 00:00:00 2001 From: Trevor Blackford <120474420+flexitrev@users.noreply.github.com> Date: Mon, 3 Feb 2025 12:55:57 -0500 Subject: [PATCH 6/6] Update libbeat/outputs/kafka/docs/kafka.asciidoc better documentation Co-authored-by: William Easton --- libbeat/outputs/kafka/docs/kafka.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/kafka/docs/kafka.asciidoc b/libbeat/outputs/kafka/docs/kafka.asciidoc index 803c2f668f2..2062fc58b31 100644 --- a/libbeat/outputs/kafka/docs/kafka.asciidoc +++ b/libbeat/outputs/kafka/docs/kafka.asciidoc @@ -263,7 +263,7 @@ endif::[] ===== `max_open_requests` The maximum number of unacknowledged requests the client will send on a single connection before blocking. -This can improve throughput, but message ordering is not guaranteed. +Increasing this can improve throughput. If max_open_requests is not 1, there is a risk of message re-ordering due to retries. https://kafka.apache.org/protocol#protocol_network https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection