diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5e3a2efd0c8..adbe33d0e6f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -214,6 +214,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `network` condition to processors for matching IP addresses against CIDRs. {pull}10743[10743] - Add if/then/else support to processors. {pull}10744[10744] - Add `community_id` processor for computing network flow hashes. {pull}10745[10745] +- Add output test to kafka output {pull}10834[10834] *Auditbeat* diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index a1f1955ff9c..3a1ba6d326b 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -31,7 +31,9 @@ import ( "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/codec" "github.com/elastic/beats/libbeat/outputs/outil" + "github.com/elastic/beats/libbeat/outputs/transport" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/testing" ) type client struct { @@ -268,3 +270,18 @@ func (r *msgRef) dec() { stats.Acked(r.total) } } + +func (c *client) Test(d testing.Driver) { + if c.config.Net.TLS.Enable == true { + d.Warn("TLS", "Kafka output doesn't support TLS testing") + } + + for _, host := range c.hosts { + d.Run("Kafka: "+host, func(d testing.Driver) { + netDialer := transport.TestNetDialer(d, c.config.Net.DialTimeout) + _, err := netDialer.Dial("tcp", host) + d.Error("dial up", err) + }) + } + +}