Skip to content

Commit

Permalink
add sasl integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Nov 15, 2021
1 parent 0d2ae67 commit 0f70586
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 20 deletions.
17 changes: 3 additions & 14 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//go:build integration
// +build integration

package kafka

import (
"errors"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -32,20 +32,9 @@ import (
)

const (
defaultWaitRetry = 1 * time.Second

// NOTE: maxWaitRetry has no effect on mode, as logstash client currently does
// not return ErrTempBulkFailure
defaultMaxWaitRetry = 60 * time.Second

logSelector = "kafka"
)

var (
errNoTopicSet = errors.New("No topic configured")
errNoHosts = errors.New("No hosts configured")
)

func init() {
sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)}

Expand Down
46 changes: 41 additions & 5 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
// specific language governing permissions and limitations
// under the License.

//go:build integration
// +build integration

package kafka

import (
Expand Down Expand Up @@ -45,8 +42,9 @@ import (
)

const (
kafkaDefaultHost = "localhost"
kafkaDefaultPort = "9092"
kafkaDefaultHost = "kafka"
kafkaDefaultPort = "9092"
kafkaDefaultSASLPort = "9093"
)

type eventInfo struct {
Expand Down Expand Up @@ -183,6 +181,37 @@ func TestKafkaPublish(t *testing.T) {
"type": "log",
}),
},
{
"publish single event to test topic",
map[string]interface{}{},
testTopic,
single(common.MapStr{
"host": "test-host",
"message": id,
}),
},
{
// Initially I tried rerunning all tests over SASL/SCRAM, but
// that added a full 30sec to the test. Instead most tests run
// in plaintext, and individual tests can switch to SCRAM
// by inserting the config in this example:
"publish single event to test topic over SASL/SCRAM",
map[string]interface{}{
"hosts": []string{getTestSASLKafkaHost()},
"protocol": "https",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.certificate_authorities": []string{
"../../../testing/environments/docker/kafka/certs/ca-cert",
},
"username": "beats",
"password": "KafkaTest",
},
testTopic,
single(common.MapStr{
"host": "test-host",
"message": id,
}),
},
}

defaultConfig := map[string]interface{}{
Expand Down Expand Up @@ -322,6 +351,13 @@ func getTestKafkaHost() string {
)
}

func getTestSASLKafkaHost() string {
return fmt.Sprintf("%v:%v",
getenv("KAFKA_HOST", kafkaDefaultHost),
getenv("KAFKA_SASL_PORT", kafkaDefaultSASLPort),
)
}

func makeConfig(t *testing.T, in map[string]interface{}) *common.Config {
cfg, err := common.NewConfigFrom(in)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion testing/environments/docker/kafka/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ wait_for_port 2181
--entity-type users \
--entity-name beats


echo "Starting Kafka broker"
mkdir -p ${KAFKA_LOGS_DIR}
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \
Expand Down

0 comments on commit 0f70586

Please sign in to comment.