Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #296 from raintank/kafkaMdmNoConsumerGroup
Browse files Browse the repository at this point in the history
refactor kafkaMdm to manage its own offsets.
  • Loading branch information
Dieterbe authored Sep 6, 2016
2 parents 4418b3c + 10a95db commit 3b5f787
Show file tree
Hide file tree
Showing 148 changed files with 21,505 additions and 1,113 deletions.
18 changes: 14 additions & 4 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,13 @@ enabled = false
brokers = kafka:9092
# kafka topic (may be given multiple times as a comma-separated list)
topics = mdm
# consumer group name
group = group1
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
offset = last
# save interval for offsets
offset-commit-interval = 5s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir =
# The minimum number of message bytes to fetch in a request
consumer-fetch-min = 1024000
# The default number of message bytes to fetch in a request
Expand Down Expand Up @@ -196,8 +201,13 @@ enabled = false
brokers = kafka:9092
# kafka topic (only one)
topic = metricpersist
# consumer group name
group = group1
# offset to start consuming from. Can be one of newest, oldest,last or a time duration
offset = last
# save interval for offsets
offset-commit-interval = 5s
# directory to store partition offsets index. supports relative or absolute paths. empty means working dir.
# it will be created (incl parent dirs) if not existing.
data-dir =
```

### nsq as transport for clustering messages
Expand Down
1 change: 1 addition & 0 deletions docs/inputs.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ This is the recommended input option if you want a queue.
This is a kafka input that uses application-level batches stored within single kafka messages.
It is discouraged because this does not allow proper routing/partitioning of messages and will be removed.
It only exists to compare performance numbers against kafka-mdm, and make mdm as fast as mdam.
It also doesn't have the performance tuning options and offset control that has kafka-mdm has. (kafka-mdam always uses latest offset).


## NSQ (deprecated)
Expand Down
3 changes: 0 additions & 3 deletions docs/installation-deb.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,6 @@ The moment your metrictank instance(s) come(s) back up, they can replay everythi
so that you can serve queries for it out of RAM).
Also, in case you want to make any change to your aggregations, Cassandra cluster, or whatever, it can be useful to re-process older data.

** Note: the above actually doesn't work yet, as we don't have the seek-back-in-time implemented yet to fetch old data from Kafka.
So for now using Kafka is more about preparing for the future than getting immediate benefit. **

### Zookeeper

Kafka requires Zookeeper, so set that up first.
Expand Down
3 changes: 0 additions & 3 deletions docs/installation-rpm.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ The moment your metrictank instance(s) come(s) back up, they can replay everythi
so that you can serve queries for it out of RAM).
Also, in case you want to make any change to your aggregations, Cassandra cluster, or whatever, it can be useful to re-process older data.

** Note: the above actually doesn't work yet, as we don't have the seek-back-in-time implemented yet to fetch old data from Kafka.
So for now using Kafka is more about preparing for the future than getting immediate benefit. **

### Zookeeper

Kafka requires Zookeeper, so set that up first.
Expand Down
173 changes: 110 additions & 63 deletions in/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,26 @@ import (
"github.com/raintank/worldping-api/pkg/log"
"github.com/rakyll/globalconf"

"github.com/bsm/sarama-cluster"
"github.com/raintank/met"
"github.com/raintank/metrictank/idx"
"github.com/raintank/metrictank/in"
"github.com/raintank/metrictank/kafka"
"github.com/raintank/metrictank/mdata"
"github.com/raintank/metrictank/usage"
)

type KafkaMdm struct {
in.In
consumer *cluster.Consumer
consumer sarama.Consumer
client sarama.Client
stats met.Backend

wg sync.WaitGroup
// read from this channel to block until consumer is cleanly stopped
StopChan chan int

// signal to PartitionConsumers to shutdown
stopConsuming chan struct{}
}

var LogLevel int
Expand All @@ -34,26 +38,32 @@ var brokerStr string
var brokers []string
var topicStr string
var topics []string
var group string
var config *cluster.Config
var offsetStr string
var dataDir string
var config *sarama.Config
var channelBufferSize int
var consumerFetchMin int
var consumerFetchDefault int
var consumerMaxWaitTime string
var consumerMaxProcessingTime string
var consumerMaxWaitTime time.Duration
var consumerMaxProcessingTime time.Duration
var netMaxOpenRequests int
var offsetMgr *kafka.OffsetMgr
var offsetDuration time.Duration
var offsetCommitInterval time.Duration

func ConfigSetup() {
inKafkaMdm := flag.NewFlagSet("kafka-mdm-in", flag.ExitOnError)
inKafkaMdm.BoolVar(&Enabled, "enabled", false, "")
inKafkaMdm.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&topicStr, "topics", "mdm", "kafka topic (may be given multiple times as a comma-separated list)")
inKafkaMdm.StringVar(&group, "group", "group1", "kafka consumer group")
inKafkaMdm.StringVar(&offsetStr, "offset", "last", "Set the offset to start consuming from. Can be one of newest, oldest,last or a time duration")
inKafkaMdm.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.")
inKafkaMdm.StringVar(&dataDir, "data-dir", "", "Directory to store partition offsets index")
inKafkaMdm.IntVar(&channelBufferSize, "channel-buffer-size", 1000000, "The number of metrics to buffer in internal and external channels")
inKafkaMdm.IntVar(&consumerFetchMin, "consumer-fetch-min", 1024000, "The minimum number of message bytes to fetch in a request")
inKafkaMdm.IntVar(&consumerFetchDefault, "consumer-fetch-default", 4096000, "The default number of message bytes to fetch in a request")
inKafkaMdm.StringVar(&consumerMaxWaitTime, "consumer-max-wait-time", "1s", "The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyway")
inKafkaMdm.StringVar(&consumerMaxProcessingTime, "consumer-max-processing-time", "1s", "The maximum amount of time the consumer expects a message takes to process")
inKafkaMdm.DurationVar(&consumerMaxWaitTime, "consumer-max-wait-time", time.Second, "The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyway")
inKafkaMdm.DurationVar(&consumerMaxProcessingTime, "consumer-max-processing-time", time.Second, "The maximum amount of time the consumer expects a message takes to process")
inKafkaMdm.IntVar(&netMaxOpenRequests, "net-max-open-requests", 100, "How many outstanding requests a connection is allowed to have before sending on it blocks")
globalconf.Register("kafka-mdm-in", inKafkaMdm)
}
Expand All @@ -63,107 +73,144 @@ func ConfigProcess(instance string) {
return
}

waitTime, err := time.ParseDuration(consumerMaxWaitTime)
if err != nil {
log.Fatal(4, "kafka-mdm invalid config, could not parse consumer-max-wait-time: %s", err)
if offsetCommitInterval == 0 {
log.Fatal(4, "kafkamdm: offset-commit-interval must be greater then 0")
}
processingTime, err := time.ParseDuration(consumerMaxProcessingTime)
if err != nil {
log.Fatal(4, "kafka-mdm invalid config, could not parse consumer-max-processing-time: %s", err)
if consumerMaxWaitTime == 0 {
log.Fatal(4, "kafkamdm: consumer-max-wait-time must be greater then 0")
}
if consumerMaxProcessingTime == 0 {
log.Fatal(4, "kafkamdm: consumer-max-processing-time must be greater then 0")
}
var err error
switch offsetStr {
case "last":
case "oldest":
case "newest":
default:
offsetDuration, err = time.ParseDuration(offsetStr)
if err != nil {
log.Fatal(4, "kafkamdm: invalid offest format. %s", err)
}
}

offsetMgr, err = kafka.NewOffsetMgr(dataDir)
if err != nil {
log.Fatal(4, "kafka-mdm couldnt create offsetMgr. %s", err)
}
brokers = strings.Split(brokerStr, ",")
topics = strings.Split(topicStr, ",")

config = cluster.NewConfig()
// see https://github.com/raintank/metrictank/issues/236
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config = sarama.NewConfig()

config.ClientID = instance + "-mdm"
config.Group.Return.Notifications = true
config.ChannelBufferSize = channelBufferSize
config.Consumer.Fetch.Min = int32(consumerFetchMin)
config.Consumer.Fetch.Default = int32(consumerFetchDefault)
config.Consumer.MaxWaitTime = waitTime
config.Consumer.MaxProcessingTime = processingTime
config.Consumer.MaxWaitTime = consumerMaxWaitTime
config.Consumer.MaxProcessingTime = consumerMaxProcessingTime
config.Net.MaxOpenRequests = netMaxOpenRequests
config.Config.Version = sarama.V0_10_0_0
config.Version = sarama.V0_10_0_0
err = config.Validate()
if err != nil {
log.Fatal(2, "kafka-mdm invalid config: %s", err)
}
}

func New(stats met.Backend) *KafkaMdm {
consumer, err := cluster.NewConsumer(brokers, group, topics, config)
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatal(2, "kafka-mdm failed to start consumer: %s", err)
log.Fatal(4, "kafka-mdm failed to create client. %s", err)
}
log.Info("kafka-mdm consumer started without error")
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
log.Fatal(2, "kafka-mdm failed to create consumer: %s", err)
}
log.Info("kafka-mdm consumer created without error")
k := KafkaMdm{
consumer: consumer,
stats: stats,
StopChan: make(chan int),
consumer: consumer,
client: client,
stats: stats,
StopChan: make(chan int),
stopConsuming: make(chan struct{}),
}

return &k
}

func (k *KafkaMdm) Start(metrics mdata.Metrics, metricIndex idx.MetricIndex, usg *usage.Usage) {
k.In = in.New(metrics, metricIndex, usg, "kafka-mdm", k.stats)
go k.notifications()
go k.consume()
}

func (k *KafkaMdm) consume() {
k.wg.Add(1)
messageChan := k.consumer.Messages()
for msg := range messageChan {
if LogLevel < 2 {
log.Debug("kafka-mdm received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
for _, topic := range topics {
// get partitions.
partitions, err := k.consumer.Partitions(topic)
if err != nil {
log.Fatal(4, "kafka-mdm: Faild to get partitions for topic %s. %s", topic, err)
}
for _, partition := range partitions {
var offset int64
switch offsetStr {
case "oldest":
offset = -2
case "newest":
offset = -1
case "last":
offset, err = offsetMgr.Last(topic, partition)
default:
offset, err = k.client.GetOffset(topic, partition, time.Now().Add(-1*offsetDuration).UnixNano()/int64(time.Millisecond))
}
if err != nil {
log.Fatal(4, "kafka-mdm: Failed to get %q duration offset for %s:%d. %q", offsetStr, topic, partition, err)
}
go k.consumePartition(topic, partition, offset)
}
k.In.Handle(msg.Value)
k.consumer.MarkOffset(msg, "")
}
log.Info("kafka-mdm consumer ended.")
k.wg.Done()
}

func (k *KafkaMdm) notifications() {
// this will continually consume from the topic until k.stopConsuming is triggered.
func (k *KafkaMdm) consumePartition(topic string, partition int32, partitionOffset int64) {
k.wg.Add(1)
for msg := range k.consumer.Notifications() {
if len(msg.Claimed) > 0 {
for topic, partitions := range msg.Claimed {
log.Info("kafka-mdm consumer claimed %d partitions on topic: %s", len(partitions), topic)
defer k.wg.Done()

pc, err := k.consumer.ConsumePartition(topic, partition, partitionOffset)
if err != nil {
log.Fatal(4, "kafka-mdm: failed to start partitionConsumer for %s:%d. %s", topic, partition, err)
}
log.Info("kafka-mdm: consuming from %s:%d from offset %d", topic, partition, partitionOffset)
currentOffset := partitionOffset
messages := pc.Messages()
ticker := time.NewTicker(offsetCommitInterval)
for {
select {
case msg := <-messages:
if LogLevel < 2 {
log.Debug("kafka-mdm received message: Topic %s, Partition: %d, Offset: %d, Key: %x", msg.Topic, msg.Partition, msg.Offset, msg.Key)
}
}
if len(msg.Released) > 0 {
for topic, partitions := range msg.Released {
log.Info("kafka-mdm consumer released %d partitions on topic: %s", len(partitions), topic)
k.In.Handle(msg.Value)
currentOffset = msg.Offset
case <-ticker.C:
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err)
}
}

if len(msg.Current) == 0 {
log.Info("kafka-mdm consumer is no longer consuming from any partitions.")
} else {
log.Info("kafka-mdm Current partitions:")
for topic, partitions := range msg.Current {
log.Info("kafka-mdm Current partitions: %s: %v", topic, partitions)
case <-k.stopConsuming:
pc.Close()
if err := offsetMgr.Commit(topic, partition, currentOffset); err != nil {
log.Error(3, "kafka-mdm failed to commit offset for %s:%d, %s", topic, partition, err)
}
log.Info("kafka-mdm consumer for %s:%d ended.", topic, partition)
return
}
}
log.Info("kafka-mdm notification processing stopped")
k.wg.Done()
}

// Stop will initiate a graceful stop of the Consumer (permanent)
//
// NOTE: receive on StopChan to block until this process completes
func (k *KafkaMdm) Stop() {
// closes notifications and messages channels, amongst others
k.consumer.Close()

close(k.stopConsuming)
go func() {
k.wg.Wait()
offsetMgr.Close()
close(k.StopChan)
}()
}
Loading

0 comments on commit 3b5f787

Please sign in to comment.