Skip to content

Commit

Permalink
Implement basic testing for the kafka output module (elastic#10834)
Browse files Browse the repository at this point in the history
  • Loading branch information
Popsiclestick authored and jsoriano committed Feb 28, 2019
1 parent f771497 commit 2011b10
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add `network` condition to processors for matching IP addresses against CIDRs. {pull}10743[10743]
- Add if/then/else support to processors. {pull}10744[10744]
- Add `community_id` processor for computing network flow hashes. {pull}10745[10745]
- Add output test to kafka output {pull}10834[10834]


*Auditbeat*
Expand Down
17 changes: 17 additions & 0 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/testing"
)

type client struct {
Expand Down Expand Up @@ -268,3 +270,18 @@ func (r *msgRef) dec() {
stats.Acked(r.total)
}
}

func (c *client) Test(d testing.Driver) {
if c.config.Net.TLS.Enable == true {
d.Warn("TLS", "Kafka output doesn't support TLS testing")
}

for _, host := range c.hosts {
d.Run("Kafka: "+host, func(d testing.Driver) {
netDialer := transport.TestNetDialer(d, c.config.Net.DialTimeout)
_, err := netDialer.Dial("tcp", host)
d.Error("dial up", err)
})
}

}

0 comments on commit 2011b10

Please sign in to comment.