Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

set sarama client KafkaVersion via config #1103

Merged
merged 2 commits into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = true
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = true
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = true
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = true
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = true
org-id = 1
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = true
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -383,6 +385,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
11 changes: 9 additions & 2 deletions input/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (k *KafkaMdm) Name() string {

var Enabled bool
var orgId uint
var kafkaVersionStr string
var brokerStr string
var brokers []string
var topicStr string
Expand Down Expand Up @@ -73,6 +74,7 @@ func ConfigSetup() {
inKafkaMdm.BoolVar(&Enabled, "enabled", false, "")
inKafkaMdm.UintVar(&orgId, "org-id", 0, "For incoming MetricPoint messages without org-id, assume this org id")
inKafkaMdm.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&kafkaVersionStr, "kafka-version", "0.10.0.0", "Kafka version in semver format. All brokers must be this version or newer.")
inKafkaMdm.StringVar(&topicStr, "topics", "mdm", "kafka topic (may be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
inKafkaMdm.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's")
Expand All @@ -92,6 +94,11 @@ func ConfigProcess(instance string) {
return
}

kafkaVersion, err := sarama.ParseKafkaVersion(kafkaVersionStr)
if err != nil {
log.Fatalf("kafkamdm: invalid kafka-version. %s", err)
}

if offsetCommitInterval == 0 {
log.Fatal("kafkamdm: offset-commit-interval must be greater then 0")
}
Expand All @@ -101,7 +108,7 @@ func ConfigProcess(instance string) {
if consumerMaxProcessingTime == 0 {
log.Fatal("kafkamdm: consumer-max-processing-time must be greater then 0")
}
var err error

switch offsetStr {
case "last":
case "oldest":
Expand Down Expand Up @@ -129,7 +136,7 @@ func ConfigProcess(instance string) {
config.Consumer.MaxWaitTime = consumerMaxWaitTime
config.Consumer.MaxProcessingTime = consumerMaxProcessingTime
config.Net.MaxOpenRequests = netMaxOpenRequests
config.Version = sarama.V0_10_0_0
config.Version = kafkaVersion
err = config.Validate()
if err != nil {
log.Fatalf("kafkamdm: invalid config: %s", err)
Expand Down
11 changes: 9 additions & 2 deletions mdata/notifierKafka/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

var Enabled bool
var kafkaVersionStr string
var brokerStr string
var brokers []string
var topic string
Expand Down Expand Up @@ -42,6 +43,7 @@ func init() {
fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
fs.BoolVar(&Enabled, "enabled", false, "")
fs.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
fs.StringVar(&kafkaVersionStr, "kafka-version", "0.10.0.0", "Kafka version in semver format. All brokers must be this version or newer.")
fs.StringVar(&topic, "topic", "metricpersist", "kafka topic")
fs.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
fs.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
Expand All @@ -55,7 +57,12 @@ func ConfigProcess(instance string) {
if !Enabled {
return
}
var err error

kafkaVersion, err := sarama.ParseKafkaVersion(kafkaVersionStr)
if err != nil {
log.Fatalf("kafka-cluster: invalid kafka-version. %s", err)
}

switch offsetStr {
case "last":
case "oldest":
Expand All @@ -70,7 +77,7 @@ func ConfigProcess(instance string) {

config = sarama.NewConfig()
config.ClientID = instance + "-cluster"
config.Version = sarama.V0_10_0_0
config.Version = kafkaVersion
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionSnappy
Expand Down
4 changes: 4 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -325,6 +327,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = kafka:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down
4 changes: 4 additions & 0 deletions scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ enabled = false
org-id = 0
# tcp address (may be given multiple times as a comma-separated list)
brokers = localhost:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
Expand Down Expand Up @@ -322,6 +324,8 @@ dns-config-path = /etc/resolv.conf
enabled = false
# tcp address (may be given multiple times as a comma-separated list)
brokers = localhost:9092
# Kafka version in semver format. All brokers must be this version or newer.
kafka-version = 0.10.0.0
# kafka topic (only one)
topic = metricpersist
# kafka partitions to consume. use '*' or a comma separated list of id's. Should match kafka-mdm-in's partitions.
Expand Down