diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 9be3970b1c41..f2b0fb27cb45 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +//go:build integration +// +build integration + package kafka import ( - "errors" - "time" - "github.com/Shopify/sarama" "github.com/elastic/beats/v7/libbeat/beat" @@ -32,20 +32,9 @@ import ( ) const ( - defaultWaitRetry = 1 * time.Second - - // NOTE: maxWaitRetry has no effect on mode, as logstash client currently does - // not return ErrTempBulkFailure - defaultMaxWaitRetry = 60 * time.Second - logSelector = "kafka" ) -var ( - errNoTopicSet = errors.New("No topic configured") - errNoHosts = errors.New("No hosts configured") -) - func init() { sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)} diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 0cc751d99b9e..a47ae2b8b1ef 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -15,9 +15,6 @@ // specific language governing permissions and limitations // under the License. -//go:build integration -// +build integration - package kafka import ( @@ -45,8 +42,9 @@ import ( ) const ( - kafkaDefaultHost = "localhost" - kafkaDefaultPort = "9092" + kafkaDefaultHost = "kafka" + kafkaDefaultPort = "9092" + kafkaDefaultSASLPort = "9093" ) type eventInfo struct { @@ -183,6 +181,37 @@ func TestKafkaPublish(t *testing.T) { "type": "log", }), }, + { + "publish single event to test topic", + map[string]interface{}{}, + testTopic, + single(common.MapStr{ + "host": "test-host", + "message": id, + }), + }, + { + // Initially I tried rerunning all tests over SASL/SCRAM, but + // that added a full 30sec to the test. Instead most tests run + // in plaintext, and individual tests can switch to SCRAM + // by inserting the config in this example: + "publish single event to test topic over SASL/SCRAM", + map[string]interface{}{ + "hosts": []string{getTestSASLKafkaHost()}, + "protocol": "https", + "sasl.mechanism": "SCRAM-SHA-512", + "ssl.certificate_authorities": []string{ + "../../../testing/environments/docker/kafka/certs/ca-cert", + }, + "username": "beats", + "password": "KafkaTest", + }, + testTopic, + single(common.MapStr{ + "host": "test-host", + "message": id, + }), + }, } defaultConfig := map[string]interface{}{ @@ -322,6 +351,13 @@ func getTestKafkaHost() string { ) } +func getTestSASLKafkaHost() string { + return fmt.Sprintf("%v:%v", + getenv("KAFKA_HOST", kafkaDefaultHost), + getenv("KAFKA_SASL_PORT", kafkaDefaultSASLPort), + ) +} + func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { cfg, err := common.NewConfigFrom(in) if err != nil { diff --git a/testing/environments/docker/kafka/run.sh b/testing/environments/docker/kafka/run.sh index f18e49bdbbe6..bfacf2a7242e 100755 --- a/testing/environments/docker/kafka/run.sh +++ b/testing/environments/docker/kafka/run.sh @@ -23,7 +23,6 @@ wait_for_port 2181 --entity-type users \ --entity-name beats - echo "Starting Kafka broker" mkdir -p ${KAFKA_LOGS_DIR} ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \