Skip to content

Commit

Permalink
ADD NEW KAFKA PARAM TO COMMANDLINE
Browse files Browse the repository at this point in the history
Resolves #2037
The default Samara Kafka client writes to Kafka broker for every single span received.
This is not efficient and caused high CPU utilization on the brokers as well as high network traffic.
Samara Kafka client has options to enable batching.  These options needed to be exposed
to the command for performance tuning. The option names matches official Kafka documentation and
not specific to Samara.

Signed-off-by: albert chung <[email protected]>
  • Loading branch information
apm-opentt committed Jan 28, 2020
1 parent 0e974ce commit da58201
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
6 changes: 6 additions & 0 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package producer

import (
"time"

"github.com/Shopify/sarama"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
Expand All @@ -32,6 +34,8 @@ type Configuration struct {
Compression sarama.CompressionCodec
CompressionLevel int
ProtocolVersion string
LingerMS int
BatchSize int
auth.AuthenticationConfig
}

Expand All @@ -42,6 +46,8 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig.Producer.Compression = c.Compression
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Flush.Bytes = c.BatchSize
saramaConfig.Producer.Flush.Frequency = time.Duration(c.LingerMS) * time.Millisecond
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ const (
suffixCompression = ".compression"
suffixCompressionLevel = ".compression-level"
suffixProtocolVersion = ".protocol-version"
suffixLingerMS = ".linger-ms"
suffixBatchSize = ".batch-size"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = EncodingProto
defaultRequiredAcks = "local"
defaultCompression = "none"
defaultCompressionLevel = 0
defaultLingerMS = 10
defaultBatchSize = 1024
)

var (
Expand Down Expand Up @@ -141,6 +145,16 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultCompressionLevel,
"(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)",
)
flagSet.Int(
configPrefix+suffixLingerMS,
defaultLingerMS,
"(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request to Kafka but increase latency.",
)
flagSet.Int(
configPrefix+suffixBatchSize,
defaultBatchSize,
"(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency.",
)
auth.AddFlags(configPrefix, flagSet)
}

Expand Down Expand Up @@ -172,6 +186,8 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
CompressionLevel: compressionLevel,
ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion),
AuthenticationConfig: authenticationOptions,
LingerMS: v.GetInt(configPrefix + suffixLingerMS),
BatchSize: v.GetInt(configPrefix + suffixBatchSize),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
opt.encoding = v.GetString(configPrefix + suffixEncoding)
Expand Down
9 changes: 8 additions & 1 deletion plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.producer.encoding=protobuf",
"--kafka.producer.required-acks=local",
"--kafka.producer.compression=gzip",
"--kafka.producer.compression-level=7"})
"--kafka.producer.compression-level=7",
"--kafka.producer.linger-ms=1000",
"--kafka.producer.batch-size=128000",
})
opts.InitFromViper(v)

assert.Equal(t, "topic1", opts.topic)
Expand All @@ -42,6 +45,8 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression)
assert.Equal(t, 7, opts.config.CompressionLevel)
assert.Equal(t, 128000, opts.config.BatchSize)
assert.Equal(t, 1000, opts.config.LingerMS)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -56,6 +61,8 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionNone, opts.config.Compression)
assert.Equal(t, 0, opts.config.CompressionLevel)
assert.Equal(t, 1024, opts.config.BatchSize)
assert.Equal(t, 10, opts.config.LingerMS)
}

func TestCompressionLevelDefaults(t *testing.T) {
Expand Down

0 comments on commit da58201

Please sign in to comment.