From 943dcc0c49c3223e78482ce5e6c29e72957b8af1 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 31 Jul 2018 15:08:04 -0700 Subject: [PATCH] Send all messages before waiting for results in kafka output (#4491) --- plugins/outputs/kafka/kafka.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index a45e2a4e9c3cf..a99c8e1c20819 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -246,32 +246,34 @@ func (k *Kafka) Description() string { } func (k *Kafka) Write(metrics []telegraf.Metric) error { - if len(metrics) == 0 { - return nil - } - + msgs := make([]*sarama.ProducerMessage, 0, len(metrics)) for _, metric := range metrics { buf, err := k.serializer.Serialize(metric) if err != nil { return err } - topicName := k.GetTopicName(metric) - m := &sarama.ProducerMessage{ - Topic: topicName, + Topic: k.GetTopicName(metric), Value: sarama.ByteEncoder(buf), } - if h, ok := metric.Tags()[k.RoutingTag]; ok { + if h, ok := metric.GetTag(k.RoutingTag); ok { m.Key = sarama.StringEncoder(h) } + msgs = append(msgs, m) + } - _, _, err = k.producer.SendMessage(m) - - if err != nil { - return fmt.Errorf("FAILED to send kafka message: %s\n", err) + err := k.producer.SendMessages(msgs) + if err != nil { + // We could have many errors, return only the first encountered. + if errs, ok := err.(sarama.ProducerErrors); ok { + for _, prodErr := range errs { + return prodErr + } } + return err } + return nil }