diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index aed288c9d6a..30920f66174 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -36,6 +36,7 @@ type Configuration struct { ProtocolVersion string `mapstructure:"protocol_version"` BatchLinger time.Duration `mapstructure:"batch_linger"` BatchSize int `mapstructure:"batch_size"` + BatchMinMessages int `mapstructure:"batch_min_messages"` BatchMaxMessages int `mapstructure:"batch_max_messages"` auth.AuthenticationConfig `mapstructure:"authentication"` } @@ -49,6 +50,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.Flush.Bytes = c.BatchSize saramaConfig.Producer.Flush.Frequency = c.BatchLinger + saramaConfig.Producer.Flush.Messages = c.BatchMinMessages saramaConfig.Producer.Flush.MaxMessages = c.BatchMaxMessages if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 15839d1f18d..4f5cca2e0a3 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -45,6 +45,7 @@ const ( suffixProtocolVersion = ".protocol-version" suffixBatchLinger = ".batch-linger" suffixBatchSize = ".batch-size" + suffixBatchMinMessages = ".batch-min-messages" suffixBatchMaxMessages = ".batch-max-messages" defaultBroker = "127.0.0.1:9092" @@ -55,6 +56,7 @@ const ( defaultCompressionLevel = 0 defaultBatchLinger = 0 defaultBatchSize = 0 + defaultBatchMinMessages = 0 defaultBatchMaxMessages = 0 ) @@ -157,10 +159,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultBatchSize, "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", ) + flagSet.Int( + configPrefix+suffixBatchMinMessages, + defaultBatchMinMessages, + "(experimental) The best-effort minimum number of messages needed to send a batch of records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + ) flagSet.Int( configPrefix+suffixBatchMaxMessages, defaultBatchMaxMessages, - "(experimental) Number of message to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", + "(experimental) Maximum number of message to batch before sending records to Kafka", ) auth.AddFlags(configPrefix, flagSet) } @@ -195,6 +202,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { AuthenticationConfig: authenticationOptions, BatchLinger: v.GetDuration(configPrefix + suffixBatchLinger), BatchSize: v.GetInt(configPrefix + suffixBatchSize), + BatchMinMessages: v.GetInt(configPrefix + suffixBatchMinMessages), BatchMaxMessages: v.GetInt(configPrefix + suffixBatchMaxMessages), } opt.Topic = v.GetString(configPrefix + suffixTopic) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index b77700c9732..5fa6f249301 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -40,6 +40,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.producer.compression-level=7", "--kafka.producer.batch-linger=1s", "--kafka.producer.batch-size=128000", + "--kafka.producer.batch-min-messages=50", "--kafka.producer.batch-max-messages=100", }) opts.InitFromViper(v) @@ -52,6 +53,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, 7, opts.Config.CompressionLevel) assert.Equal(t, 128000, opts.Config.BatchSize) assert.Equal(t, time.Duration(1*time.Second), opts.Config.BatchLinger) + assert.Equal(t, 50, opts.Config.BatchMinMessages) assert.Equal(t, 100, opts.Config.BatchMaxMessages) } @@ -69,6 +71,7 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, 0, opts.Config.CompressionLevel) assert.Equal(t, 0, opts.Config.BatchSize) assert.Equal(t, time.Duration(0*time.Second), opts.Config.BatchLinger) + assert.Equal(t, 0, opts.Config.BatchMinMessages) assert.Equal(t, 0, opts.Config.BatchMaxMessages) }