From 1217774c2cbbe4139eb0027930efb25d9470112e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Tue, 4 Aug 2020 17:19:35 +0100 Subject: [PATCH] Add --kafka.producer.batch-min-messages collector flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently to control batch size when collector writes to kafka user can set --kafka.producer.batch-max-messages flag. This flag is used to configure kafka client library and set upper limit on the number of messages in a single batch, so the meaning of this flag is 'write up to N messages in a single batch'. Current wording on --kafka.producer.batch-max-messages help text is 'Number of message to batch before sending records to Kafka.' which suggests that it's used to set minimal number of messages needed to send a batch to kafka. For a user who wants to enforce a minimal batch size this means that there's currently no way of doing that, but at the same time there's a flag that is described as a way to do it, while actually doing something else. This patch adds a --kafka.producer.batch-min-messages flag that allows configuring minimal batch size in the kafka client library and updates --kafka.producer.batch-max-messages flag description to be more clear on the effect it causes. Signed-off-by: Ɓukasz Mierzwa --- pkg/kafka/producer/config.go | 2 ++ plugin/storage/kafka/options.go | 10 +++++++++- plugin/storage/kafka/options_test.go | 3 +++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index aed288c9d6a..30920f66174 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -36,6 +36,7 @@ type Configuration struct { ProtocolVersion string `mapstructure:"protocol_version"` BatchLinger time.Duration `mapstructure:"batch_linger"` BatchSize int `mapstructure:"batch_size"` + BatchMinMessages int `mapstructure:"batch_min_messages"` BatchMaxMessages int `mapstructure:"batch_max_messages"` auth.AuthenticationConfig `mapstructure:"authentication"` } @@ -49,6 +50,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.Flush.Bytes = c.BatchSize saramaConfig.Producer.Flush.Frequency = c.BatchLinger + saramaConfig.Producer.Flush.Messages = c.BatchMinMessages saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 15839d1f18d..4f5cca2e0a3 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -45,6 +45,7 @@ const ( suffixProtocolVersion = ".protocol-version" suffixBatchLinger = ".batch-linger" suffixBatchSize = ".batch-size" + suffixBatchMinMessages = ".batch-min-messages" suffixBatchMaxMessages = ".batch-max-messages" defaultBroker = "127.0.0.1:9092" @@ -55,6 +56,7 @@ const ( defaultCompressionLevel = 0 defaultBatchLinger = 0 defaultBatchSize = 0 + defaultBatchMinMessages = 0 defaultBatchMaxMessages = 0 ) @@ -157,10 +159,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultBatchSize, "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", ) + flagSet.Int( + configPrefix+suffixBatchMinMessages, + defaultBatchMinMessages, + "(experimental) The best-effort minimum number of messages needed to send a batch of records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + ) flagSet.Int( configPrefix+suffixBatchMaxMessages, defaultBatchMaxMessages, - "(experimental) Number of message to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + "(experimental) Maximum number of message to batch before sending records to Kafka", ) auth.AddFlags(configPrefix, flagSet) } @@ -195,6 +202,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { AuthenticationConfig: authenticationOptions, BatchLinger: v.GetDuration(configPrefix + suffixBatchLinger), BatchSize: v.GetInt(configPrefix + suffixBatchSize), + BatchMinMessages: v.GetInt(configPrefix + suffixBatchMinMessages), BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), } opt.Topic = v.GetString(configPrefix + suffixTopic) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index b77700c9732..5fa6f249301 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -40,6 +40,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.producer.compression-level=7", "--kafka.producer.batch-linger=1s", "--kafka.producer.batch-size=128000", + "--kafka.producer.batch-min-messages=50", "--kafka.producer.batch-max-messages=100", }) opts.InitFromViper(v) @@ -52,6 +53,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, 7, opts.Config.CompressionLevel) assert.Equal(t, 128000, opts.Config.BatchSize) assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger) + assert.Equal(t, 50, opts.Config.BatchMinMessages) assert.Equal(t, 100, opts.Config.BatchMaxMessages) } @@ -69,6 +71,7 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, 0, opts.Config.CompressionLevel) assert.Equal(t, 0, opts.Config.BatchSize) assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger) + assert.Equal(t, 0, opts.Config.BatchMinMessages) assert.Equal(t, 0, opts.Config.BatchMaxMessages) }