diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index ad665fd0979..2a3ab075769 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -34,7 +34,7 @@ type Configuration struct { Compression sarama.CompressionCodec CompressionLevel int ProtocolVersion string - LingerMS int + BatchLinger time.Duration BatchSize int auth.AuthenticationConfig } @@ -47,7 +47,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig.Producer.CompressionLevel = c.CompressionLevel saramaConfig.Producer.Return.Successes = true saramaConfig.Producer.Flush.Bytes = c.BatchSize - saramaConfig.Producer.Flush.Frequency = time.Duration(c.LingerMS) * time.Millisecond + saramaConfig.Producer.Flush.Frequency = c.BatchLinger if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index f3763c6c23b..06fd2a26630 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/Shopify/sarama" "github.com/spf13/viper" @@ -43,7 +44,7 @@ const ( suffixCompression = ".compression" suffixCompressionLevel = ".compression-level" suffixProtocolVersion = ".protocol-version" - suffixLingerMS = ".linger-ms" + suffixLinger = ".linger" suffixBatchSize = ".batch-size" defaultBroker = "127.0.0.1:9092" @@ -52,8 +53,8 @@ const ( defaultRequiredAcks = "local" defaultCompression = "none" defaultCompressionLevel = 0 - defaultLingerMS = 10 - defaultBatchSize = 1024 + defaultLinger = time.Duration(0 * time.Millisecond) + defaultBatchSize = 16384 ) var ( @@ -145,15 +146,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultCompressionLevel, "(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)", ) - flagSet.Int( - configPrefix+suffixLingerMS, - defaultLingerMS, - "(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request volume to Kafka but increase latency and the possibility of data loss in case of process restart.", + flagSet.Duration( + configPrefix+suffixLinger, + defaultLinger, + "(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", ) flagSet.Int( configPrefix+suffixBatchSize, defaultBatchSize, - "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request volume to Kafka but increase latency and the possibility of data loss in case of process restart.", + "(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/", ) auth.AddFlags(configPrefix, flagSet) } @@ -186,7 +187,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) { CompressionLevel: compressionLevel, ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion), AuthenticationConfig: authenticationOptions, - LingerMS: v.GetInt(configPrefix + suffixLingerMS), + BatchLinger: v.GetDuration(configPrefix + suffixLinger), BatchSize: v.GetInt(configPrefix + suffixBatchSize), } opt.topic = v.GetString(configPrefix + suffixTopic) diff --git a/plugin/storage/kafka/options_test.go b/plugin/storage/kafka/options_test.go index 54c6bb0030d..1c02e2ae37d 100644 --- a/plugin/storage/kafka/options_test.go +++ b/plugin/storage/kafka/options_test.go @@ -16,6 +16,7 @@ package kafka import ( "testing" + "time" "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" @@ -34,7 +35,7 @@ func TestOptionsWithFlags(t *testing.T) { "--kafka.producer.required-acks=local", "--kafka.producer.compression=gzip", "--kafka.producer.compression-level=7", - "--kafka.producer.linger-ms=1000", + "--kafka.producer.linger=1s", "--kafka.producer.batch-size=128000", }) opts.InitFromViper(v) @@ -46,7 +47,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression) assert.Equal(t, 7, opts.config.CompressionLevel) assert.Equal(t, 128000, opts.config.BatchSize) - assert.Equal(t, 1000, opts.config.LingerMS) + assert.Equal(t, time.Duration(1*time.Second), opts.config.BatchLinger) } func TestFlagDefaults(t *testing.T) { @@ -61,8 +62,8 @@ func TestFlagDefaults(t *testing.T) { assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks) assert.Equal(t, sarama.CompressionNone, opts.config.Compression) assert.Equal(t, 0, opts.config.CompressionLevel) - assert.Equal(t, 1024, opts.config.BatchSize) - assert.Equal(t, 10, opts.config.LingerMS) + assert.Equal(t, 16384, opts.config.BatchSize) + assert.Equal(t, time.Duration(0*time.Second), opts.config.BatchLinger) } func TestCompressionLevelDefaults(t *testing.T) {