Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
set sarama client KafkaVersion via config
Browse files Browse the repository at this point in the history
see issue: #1053

from  https://github.com/Shopify/sarama/blob/v1.19.0/config.go#L324-L330

```
The version of Kafka that Sarama will assume it is running against.
Defaults to the oldest supported stable version. Since Kafka provides
backwards-compatibility, setting it to a version older than you have
will not break anything, although it may prevent you from using the
latest features. Setting it to a version greater than you are actually
running may lead to random breakage.
```
  • Loading branch information
woodsaj committed Oct 23, 2018
1 parent cf70352 commit 0feecad
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 4 deletions.
4 changes: 4 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = true
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = true
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = true
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = true
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = true
org-id = 1
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = true
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -383,6 +385,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
11 changes: 9 additions & 2 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (k *KafkaMdm) Name() string {

var Enabled bool
var orgId uint
var kafkaVersionStr string
var brokerStr string
var brokers []string
var topicStr string
Expand Down Expand Up @@ -73,6 +74,7 @@ func ConfigSetup() {
inKafkaMdm.BoolVar(&Enabled, "enabled", false, "")
inKafkaMdm.UintVar(&orgId, "org-id", 0, "For incoming MetricPoint messages without org-id, assume this org id")
inKafkaMdm.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&kafkaVersionStr, "kafka-version", "V0_10_0_0", "Kafka version. All brokers must be this version or newer.")
inKafkaMdm.StringVar(&topicStr, "topics", "mdm", "kafka topic (may be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
inKafkaMdm.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's")
Expand All @@ -92,6 +94,11 @@ func ConfigProcess(instance string) {
return
}

kafkaVersion, err := sarama.ParseKafkaVersion(kafkaVersionStr)
if err != nil {
log.Fatalf("kafkamdm: invalid kafka-version. %s", err)
}

if offsetCommitInterval == 0 {
log.Fatal("kafkamdm: offset-commit-interval must be greater then 0")
}
Expand All @@ -101,7 +108,7 @@ func ConfigProcess(instance string) {
if consumerMaxProcessingTime == 0 {
log.Fatal("kafkamdm: consumer-max-processing-time must be greater then 0")
}
var err error

switch offsetStr {
case "last":
case "oldest":
Expand Down Expand Up @@ -129,7 +136,7 @@ func ConfigProcess(instance string) {
config.Consumer.MaxWaitTime = consumerMaxWaitTime
config.Consumer.MaxProcessingTime = consumerMaxProcessingTime
config.Net.MaxOpenRequests = netMaxOpenRequests
config.Version = sarama.V0_10_0_0
config.Version = kafkaVersion
err = config.Validate()
if err != nil {
log.Fatalf("kafkamdm: invalid config: %s", err)
Expand Down
11 changes: 9 additions & 2 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

var Enabled bool
var kafkaVersionStr string
var brokerStr string
var brokers []string
var topic string
Expand Down Expand Up @@ -42,6 +43,7 @@ func init() {
fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
fs.BoolVar(&Enabled, "enabled", false, "")
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
fs.StringVar(&kafkaVersionStr, "kafka-version", "V0_10_0_0", "Kafka version. All brokers must be this version or newer.")
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
Expand All @@ -55,7 +57,12 @@ func ConfigProcess(instance string) {
if !Enabled {
return
}
var err error

kafkaVersion, err := sarama.ParseKafkaVersion(kafkaVersionStr)
if err != nil {
log.Fatalf("kafka-cluster: invalid kafka-version. %s", err)
}

switch offsetStr {
case "last":
case "oldest":
Expand All @@ -70,7 +77,7 @@ func ConfigProcess(instance string) {

config = sarama.NewConfig()
config.ClientID = instance + "-cluster"
config.Version = sarama.V0_10_0_0
config.Version = kafkaVersion
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionSnappy
Expand Down
4 changes: 4 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -325,6 +327,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = localhost:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = localhost:9092
# Kafka version. All brokers must be this version or newer.
kafka-version = V0_10_0_0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down

0 comments on commit 0feecad

Please sign in to comment.