-
Notifications
You must be signed in to change notification settings - Fork 105
Use Confluent and move the kafka-consumers into one consumer struct #879
Conversation
Just some feedback as somebody who tried tsdb-gw via rpm after this similar update. I think FPM should be set to have a depends on librdkafka, there is a no librdkafka in any of the el6/amzn1 repos however there is a nice spec file out there already for users to build it themselves. How metrictank will probably behave after merge:
Users can build the dependant rpm here: Suggested update file: https://github.com/grafana/metrictank/blob/master/scripts/build_packages.sh I'm not sure how to handle it but this is a breaking change for anybody upgrading rpm on el6/amzn1. el6 is EOL in 2020. |
kafka/consumer.go
Outdated
defer c.wg.Done() | ||
|
||
var ok bool | ||
var offsetPtr *int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offsetPtr
is being updated on every msg, but then nothing is done with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this value is read by the monitorLag()
method: https://github.com/grafana/metrictank/pull/879/files#diff-57960cc1ee87fab707306aab51440a91R251
kafka/consumer.go
Outdated
} | ||
|
||
c.conf.MessageHandler(e.Value, tp.Partition) | ||
atomic.StoreInt64(offsetPtr, int64(tp.Offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this should be
atomic.StoreInt64(c.currentOffsets[tp.Partition], int64(tp.Offset))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was worried about a message arriving from a partition that we did not expect. in such a case we would then call atomic.StoreInt64(nil, int64(tp.Offset))
if we didn't verify first that this partition id exists in c.currentOffsets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, i missed that you are setting offsetPtr to c.currentOffsets[tp.Partition] on L#221
kafka/consumer.go
Outdated
c.consumer.Unassign() | ||
log.Info("kafka-consumer: Revoked partitions: %+v", e) | ||
case confluent.PartitionEOF: | ||
fmt.Printf("%% Reached %v\n", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should either be removed, or use log.Debug
. But i think we should just set enable.partition.eof
to false
in the confluent.ConfigMap to prevent these events from being emitted.
kafka/consumer.go
Outdated
c.partitionLogSize[partition].Set(int(newest)) | ||
} | ||
|
||
c.partitionOffset[partition].Set(int(offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already set on L#252
kafka/consumer.go
Outdated
currentOffset = time.Now().Add(-1*offsetDuration).UnixNano() / int64(time.Millisecond) | ||
currentOffset, _, err = c.tryGetOffset(topic, partition, currentOffset, 3, time.Second) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the offset is outside of the range what kafka has, it will return an error. If that happens we just want to use oldest, not return an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually this comment sounds like in this case it would just use oldest:
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/consumer.go#L488-L490
but i'll fall back to oldest in case of any error now
kafka/consumer.go
Outdated
return 0, 0, err | ||
} | ||
|
||
var val1, val2 int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets rename these to something meaningful.
kafka/consumer.go
Outdated
times, err = c.consumer.OffsetsForTimes(times, c.conf.MetadataTimeout) | ||
if err == nil { | ||
if len(times) == 0 { | ||
err = fmt.Errorf("Got 0 topics returned from broker") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to fall back to offsetEnd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should this fall back to offsetEnd, while in the above case it should fall back to offsetBeginning? shouldn't they both fall back to offsetBeginning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is that?
Lines 360 to 383 in 985f69a
for { | |
if offset == confluent.OffsetBeginning || offset == confluent.OffsetEnd { | |
beginning, end, err = c.consumer.QueryWatermarkOffsets(topic, partition, c.conf.MetadataTimeout) | |
if err == nil { | |
if offset == confluent.OffsetBeginning { | |
return beginning, nil | |
} else { | |
return end, nil | |
} | |
} | |
} else { | |
times := []confluent.TopicPartition{{Topic: &topic, Partition: partition, Offset: offset}} | |
times, err = c.consumer.OffsetsForTimes(times, c.conf.MetadataTimeout) | |
if err != nil || len(times) == 0 { | |
if err == nil { | |
err = fmt.Errorf("Failed to get offset %d from kafka, falling back to \"oldest\"", offset) | |
} else { | |
err = fmt.Errorf("Failed to get offset %d from kafka, falling back to \"oldest\": %s", offset, err) | |
} | |
offset = confluent.OffsetBeginning | |
} else { | |
return int64(times[0].Offset), nil | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, i meant offsetBeginning.
kafka/consumer.go
Outdated
return c.consumer.Assign(topicPartitions) | ||
} | ||
|
||
func (c *Consumer) tryGetOffset(topic string, partition int32, offsetI int64, attempts int, sleep time.Duration) (int64, int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this return 2 values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changing that. it made sense in an older version of the code, but now not anymore
kafka/partitions.go
Outdated
continue | ||
} | ||
|
||
if tm, ok = metadata.Topics[topic]; !ok || len(tm.Partitions) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to check if topic is in metadata.Topics, it has already been checked.
Maybe change these 2 checks into
tm, ok := metadata.Topics[topic]
if !ok || tm.Error.Code() == confluent.ErrUnknownTopic {
log.Warn("kafka: unknown topic %s, %d retries", topic, retry)
time.Sleep()
continue
}
if len(tm.Partitions) == 0 {
log.Warn("kafka: 0 partitions returned for %s, %d retries left, %d backoffMs", topic, retry, backoff)
sleep()
continue
}
mdata/notifierKafka/cfg.go
Outdated
fs.DurationVar(&offsetCommitInterval, "offset-commit-interval", time.Second*5, "Interval at which offsets should be saved.") | ||
fs.IntVar(&batchNumMessages, "batch-num-messages", 10000, "Maximum number of messages batched in one MessageSet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ass all of these flag vars are just going to be put into a kafka.ConsumerConf{} why just just a package global consumerConfig
eg
var consumerConfig kafka.ConsumerConf
func init() {
consumerConfig = kafka.NewConfig()
fs := flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
fs.IntVar(&consumerConfig.MetadataTimeout, "consumer-metadata-timeout-ms", 10000, "Maximum time to wait for the broker to send its metadata in ms")
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
mdata/notifierKafka/notifierKafka.go
Outdated
break EVENTS | ||
} | ||
default: | ||
fmt.Printf("Ignored unexpected event: %s\n", ev) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a log message
fdd5285
to
7f010ed
Compare
where is it being disabled? i don't see that in our code and according to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md it defaults to true |
0884f9e
to
718029b
Compare
|
I ran two types of benchmarks now, comparing the current When I fed them with a steady stream of data that was below their maximum, then the cpu & memory usage metrics look pretty similar. When they have to replay the backlog, which maxes them out, the current confluent branch seems to be quite a lot faster, but it also uses much more CPU/Memory. I think I should probably check if there is a way to optimize this memory usage. replaying backlog: master: confluent: steady consuming: master: confluent: |
we decided to go forward with sarama instead. see #906 |
Replaces the sarama consumers with confluent ones.
Also gets rid of the duplication between the kafka notifier and kafka input by moving all kafka consumer related stuff into a new struct that's used by both of them.