Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SASL/SCRAM support to the Kafka test container #28971

Merged
merged 5 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package kafka

import (
"errors"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -32,20 +29,9 @@ import (
)

const (
defaultWaitRetry = 1 * time.Second

// NOTE: maxWaitRetry has no effect on mode, as logstash client currently does
// not return ErrTempBulkFailure
defaultMaxWaitRetry = 60 * time.Second

logSelector = "kafka"
)

var (
errNoTopicSet = errors.New("No topic configured")
errNoHosts = errors.New("No hosts configured")
)

func init() {
sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)}

Expand Down
43 changes: 41 additions & 2 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ import (
)

const (
kafkaDefaultHost = "localhost"
kafkaDefaultPort = "9092"
kafkaDefaultHost = "kafka"
kafkaDefaultPort = "9092"
kafkaDefaultSASLPort = "9093"
)

type eventInfo struct {
Expand Down Expand Up @@ -183,6 +184,37 @@ func TestKafkaPublish(t *testing.T) {
"type": "log",
}),
},
{
"publish single event to test topic",
map[string]interface{}{},
testTopic,
single(common.MapStr{
"host": "test-host",
"message": id,
}),
},
{
// Initially I tried rerunning all tests over SASL/SCRAM, but
// that added a full 30sec to the test. Instead most tests run
// in plaintext, and individual tests can switch to SCRAM
// by inserting the config in this example:
"publish single event to test topic over SASL/SCRAM",
map[string]interface{}{
"hosts": []string{getTestSASLKafkaHost()},
"protocol": "https",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.certificate_authorities": []string{
"../../../testing/environments/docker/kafka/certs/ca-cert",
},
"username": "beats",
"password": "KafkaTest",
},
testTopic,
single(common.MapStr{
"host": "test-host",
"message": id,
}),
},
}

defaultConfig := map[string]interface{}{
Expand Down Expand Up @@ -322,6 +354,13 @@ func getTestKafkaHost() string {
)
}

func getTestSASLKafkaHost() string {
return fmt.Sprintf("%v:%v",
getenv("KAFKA_HOST", kafkaDefaultHost),
getenv("KAFKA_SASL_PORT", kafkaDefaultSASLPort),
)
}

func makeConfig(t *testing.T, in map[string]interface{}) *common.Config {
cfg, err := common.NewConfigFrom(in)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions testing/environments/docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && \

ADD run.sh /run.sh
ADD healthcheck.sh /healthcheck.sh
ADD certs/broker.keystore.jks /broker.keystore.jks
ADD certs/client.truststore.jks /broker.truststore.jks

EXPOSE 9092
EXPOSE 9093
EXPOSE 2181

# healthcheck.sh tries to create and delete an empty kafka topic (the topic
Expand Down
35 changes: 35 additions & 0 deletions testing/environments/docker/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kafka test container

This Docker container provides an environment for testing with Kafka. It exposes two ports to the host system, `9092` for `PLAINTEXT` and `9093` for `SASL/SSL` with username `beats` and password `KafkaTest`.

## Certificates

The test environment uses a self-signed SSL certificate in the broker. To connect, clients will need to set `certs/client.truststore.jks` as their trust store.

The files in the `certs` directory were generated with these commands:

```sh
# create the broker's key
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -validity 5000 -keyalg RSA -genkey

What is your first and last name?
[Unknown]: kafka
...

# create a new certificate authority
openssl req -new -x509 -keyout ca-key -out ca-cert -days 5000

# add the CA to the kafka client's trust store
keytool -keystore client.truststore.jks -storepass KafkaTest -alias CARoot -keyalg RSA -import -file ca-cert

# export the server certificate
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -certreq -file broker-cert

# sign it with the CA
openssl x509 -req -CA ca-cert -CAkey ca-key -in broker-cert -out broker-cert-signed -days 5000 -CAcreateserial -passin pass:KafkaTest

# import CA and signed cert back into server keystore
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias CARoot -import -file ca-cert
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -import -file broker-cert-signed

```
18 changes: 18 additions & 0 deletions testing/environments/docker/kafka/certs/broker-cert
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
MIIC3zCCAccCAQAwajEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93
bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH
VW5rbm93bjEOMAwGA1UEAxMFa2Fma2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQCH8VYN9FMHXjnLUwT0AJDKM0u/jXE0ng1UfWPVQaVI+Eny+vmf1zDm
d/AoqXaYKzVNvyRXCy1BZGaLVA3go1U7+tVjtniuLTmveE07PuX4w9/ukZPKlUxf
KCjYCmh38BeYiJA2inaxScDO2hxHfB2pulsM+l9+q0NMXFe6RSUAKS0pAeY8KLz9
yWg9hfq6JPuPT14HZmyxLn+1SwRbZZ+TQjlAHfZFpu/igg6cif/ez30z5Gqci+2i
VPlwl9peEsaXn5wbuP6J2Uo6dMoGiFyxFdGCWVWP9WDncvfYKJwQs09QdbFLxAst
BYSmOTszUP+h0SohaxpdC4AOcJxs+MwhAgMBAAGgMDAuBgkqhkiG9w0BCQ4xITAf
MB0GA1UdDgQWBBRFzbnwQXp+h4xE233eH3D+KfozxTANBgkqhkiG9w0BAQsFAAOC
AQEAQti4SPU8KfSoeLbLUic7UciVmwO0TZtiG+Y6fCTdRm7SYovg2zXH576ERClf
JQCzUuMH1Fi6k5adhMUxopJrVirZWOANoffe3yY/PUuFPMv5rvjmG7JqRNloNFYC
4Jah/XeITkw3BcwYxvY3lOZeXgBoRI+PwaD4JNHYf9ruc8cxY59lbWGCQOdbWYuk
ex/Y/rdmiv1cZpVAYY3VkdUnISXf4eePz4+hUdyuNGYt8Rh/dCj0D/1Xdo9jguUw
IWihuXNfH5hBzBp2hX49tCa7j8stOQW6+AS+ysUBRseFNnsu9j95PD+ue9GU5ZLR
mQzlkeZcfimH796e6XF81oCDkA==
-----END NEW CERTIFICATE REQUEST-----
18 changes: 18 additions & 0 deletions testing/environments/docker/kafka/certs/broker-cert-signed
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC8zCCAdsCCQC1GCJdAf28SzANBgkqhkiG9w0BAQUFADANMQswCQYDVQQGEwJV
UzAeFw0yMTEwMjEyMDM0MTBaFw0zNTA2MzAyMDM0MTBaMGoxEDAOBgNVBAYTB1Vu
a25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAOBgNV
BAoTB1Vua25vd24xEDAOBgNVBAsTB1Vua25vd24xDjAMBgNVBAMTBWthZmthMIIB
IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAh/FWDfRTB145y1ME9ACQyjNL
v41xNJ4NVH1j1UGlSPhJ8vr5n9cw5nfwKKl2mCs1Tb8kVwstQWRmi1QN4KNVO/rV
Y7Z4ri05r3hNOz7l+MPf7pGTypVMXygo2Apod/AXmIiQNop2sUnAztocR3wdqbpb
DPpffqtDTFxXukUlACktKQHmPCi8/cloPYX6uiT7j09eB2ZssS5/tUsEW2Wfk0I5
QB32Rabv4oIOnIn/3s99M+RqnIvtolT5cJfaXhLGl5+cG7j+idlKOnTKBohcsRXR
gllVj/Vg53L32CicELNPUHWxS8QLLQWEpjk7M1D/odEqIWsaXQuADnCcbPjMIQID
AQABMA0GCSqGSIb3DQEBBQUAA4IBAQCMGbXC2YdC9+jJjUvuEJIQGwpapJ5Dejng
cnvE//+x8A4W9vC7OJUHcML2GGQIrgvYWlmsCEWX1lJtcVIbqkTqq9Sq99htdMfM
ay4fJB/ey005bhcbEP+19342HkmoOUkEg7qGWZhhL05y0m1vxKvKSUX3p+4TyW1Y
AheRbb9j41Ld3E8+COGwqIWpMNfsGjLqWjUIajemFH91Eo2FFvshM/5ly12GZEil
ivmUqSzV7o6ri0V7DZ5NPOSXEbiMQj5FfmImqXbo7JtBqM/H9S2yAPXZBfAloVNv
XvjG0dY8cnYwGL5MSRiZEuJdimptWnMzFXbD8zyRxSIUMpbDcHNf
-----END CERTIFICATE-----
Binary file not shown.
16 changes: 16 additions & 0 deletions testing/environments/docker/kafka/certs/ca-cert
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICljCCAX4CCQD+dvzut8IfyTANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJV
UzAeFw0yMTEwMjEyMDMyMDJaFw0zNTA2MzAyMDMyMDJaMA0xCzAJBgNVBAYTAlVT
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlR6sKchCTM0qdrjAdWqR
BmSLfHHe+LB43B0T/+3Y1fporzg2eZC1dPCf7TXLzL92NOlJ+JQCsfb160gKTGfb
7+z2jm+vumbYlKVffsD7MjNdW8SDu9hfMa2DyTY742n3R/X8pc4VK0fdlTQx22Zp
aIA+XwD6hHxZQS9PHVNwTFUoPkP4jevcFANwjLUBgy3dPK0iWdVILnaAwEBg82z3
zWRJ7I4Eg6KS+GtwZPovhiHqcJpz7QPrmggCglL8q0YZQrVrYNucRV1sjPAhEfTA
Sh7Z0UVYdx5+jJq7MyslBqzEM0OrmKrldrTHOAo9+cTc1GiKGRBhVei2R2fP2XAC
HQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQA37KiWMR6SZRmlLKV7hP9/9H1cL1FJ
OPa5MKcwh8Q38IRALCF5SlxxOByP8O01ZInkWjR3jJbMc/k4RwxQXfzYDvB4jleU
MyX63qekIsxFdUn+fzt+wA0xb7tOPGVUbM6QI++YH28p8yzSdY/bXrjRweQuVRC0
B+0zMijI1uU6GRME9+e1OLsN5rDzCFEJUra/+UDc23BTOjC6Az00UKpOGv6oAqg8
iuCOeVCRVPtd7mGJK1dGW3WXV3pbsu4EvfXve9qFFV/7d811JNBjnhF5lFN2JGVs
Ka9JebJ8EKWff6Ns14FJ2cOG3tx7KuWcnfTdma/mH4PeGoU1Og5Ln/ea
-----END CERTIFICATE-----
1 change: 1 addition & 0 deletions testing/environments/docker/kafka/certs/ca-cert.srl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
B518225D01FDBC4B
30 changes: 30 additions & 0 deletions testing/environments/docker/kafka/certs/ca-key
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQIPiUp46K/yl8CAggA
MB0GCWCGSAFlAwQBKgQQ3vI4jI41do9rQtAyc+JwfwSCBNDDDv3uSHZ/12ACMLyu
vhrz3en62CT4ooeZKjpep4H/s+2pfqfg5bDeQUguNUo2zsy0EKeK2rIBVUG6KWIa
DWvnl0JaM/TrtGg1pMZMIFPowaf1mNcVcvN5IWEgLgoT+SY8lLtmVqIdU1d2F6nV
t+7JEpktXA5ThS0FbBpW0XI+kG9W7Ln9YPjGLbjcPayJQX1yzWJYEUfmm8lJl7cT
h5V94sB2KP9pwAP/SqElt2QK1BhStUVv3ezp6TT82PETi3No0Uh+oWaxltjPCr4z
5MN/tTDJQGc3llDIrZT+umZcgB6DBsc+nXjrlAHWPeuhIcjNJGs3V0xazQs60M6n
ldhcJH453Muwtp40VDkT21plVPwUrwQX/6gIWIHnyvK44sRG4NvmG+4NBA8V4TdK
AKhkhYTYS+sAUDsAFo408OXvpdGy7G2/cZn+r2frLHLxUU2peqRFP7YqLVs2sdez
sFyt6ZMSAh8UZDYK9kpyQMoeYj7Az14kMKIlE0JADsd3Mn8S/QJrrKWQzQhQVz6O
0rpaGnIM3cICgTK7gTlK+lDIbqAmCYnFLQsU9rHIpzVMkx2iYEId+YNbxodHpFPa
MCz6HU8qI9Tv9JIOfJKdE7tvlSnR89usOU/z+NSGqKm1dhYjG1BNI7wk8/mgMxOg
9BAujodmGvFpMPba84+QT/AtTy9YMMi4Z0H7BKHGD7HwSOTx7kP9hMz/sVVnKxfO
8C9gE91D4enrpQXu7J5JU07LCWSNLiZEegbdKvjBz5Cvfj5LPhazTLYuuU0KNIP9
MjrgodrSp1LgESAA7z8qKUyhX2Z6uO0q1Q5OUFgGNEWXSYLplhWrvftPqdV0YAFI
4y794sojVBBnHYo+Lm5gugm4cg6bLk/YY3ScQqPYEUwO1LZSMUoB9ixLHUYY48ND
xbevM9V8vLgb6Q46zTCYPxwYfxNlcWxeQjwbVEaha5n2Sgu0dmrG/+LjrEwYtHY7
zPdTbl28OyvXDqvilXcDQS8ZQBwqkZ00pg9fokElztgVIMp4cbtBTCiqipfNBGJg
ALEu/lFNlGjvv4iwOdx/yhVjFt2Ri3ViTEoTJ3wAh3o4wh/o8wluNb3bMgfKzw61
/WptUvLnqKIGQ0xZtunxG9WHIpc8oTRZMMUgLnoVzJvdU9cONT5GER9WuQbwXmEE
ytIx9tVq4cb3CoJhynrL9cjGp716nBkx534gyu5N21elb8npk1XAHd6AHUViun0J
TnVHPwSSLN7naaiMKS+8KaknAdjvKCIUytLSRpRb4rkoqD/7MlYlMTAPF5IX6/Xj
fVfR8HKWtkvqhAM4lQ57zwGlpXifGM5Vi/Dq8JYcTOIHIzggbhfi+WVwWJ+SJVp3
FQBSvyJ0XFV8piuP6J1PB6zXLioRiUMDbrl0Hmwo4spLswRsZ6D/6QuNUeNN2Lh1
ZqtkAHWnIll1nviSEWPxiu0lA9ZwfPP1t+H0UkVi8JBUCrTh0gyr2e/CGZAd1GoP
/LnvaRntmqytavI65NlPPlvF9S7enjeEkxvtqhAIuU9nTMORnmpXX+xStfm/AtQp
2UNklwWW6bwPhMF9w+FnuJoK7mrQ5DphsZNcTly1RQ0uQkT6yrzWK5MNmLRiNOez
OmM968GQKexUL9r0BmFi7T00rQ==
-----END ENCRYPTED PRIVATE KEY-----
Binary file not shown.
26 changes: 22 additions & 4 deletions testing/environments/docker/kafka/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,31 @@ echo "Starting ZooKeeper"
${KAFKA_HOME}/bin/zookeeper-server-start.sh ${KAFKA_HOME}/config/zookeeper.properties &
wait_for_port 2181

# create a user beats with password KafkaTest, for use in client SASL authentication
/kafka/bin/kafka-configs.sh \
--zookeeper localhost:2181 \
--alter --add-config 'SCRAM-SHA-512=[password=KafkaTest]' \
--entity-type users \
--entity-name beats

echo "Starting Kafka broker"
mkdir -p ${KAFKA_LOGS_DIR}
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \
--override delete.topic.enable=true --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \
--override listeners=PLAINTEXT://0.0.0.0:9092 \
--override logs.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200 \
--override num.partitions=3 &
--override delete.topic.enable=true \
--override advertised.host.name=${KAFKA_ADVERTISED_HOST} \
--override listeners=PLAINTEXT://0.0.0.0:9092,SASL_SSL://0.0.0.0:9093 \
--override advertised.listeners=PLAINTEXT://${KAFKA_ADVERTISED_HOST}:9092,SASL_SSL://${KAFKA_ADVERTISED_HOST}:9093 \
--override inter.broker.listener.name=PLAINTEXT \
--override sasl.enabled.mechanisms=SCRAM-SHA-512 \
--override listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required;" \
--override logs.dir=${KAFKA_LOGS_DIR} \
--override log4j.logger.kafka=DEBUG,kafkaAppender \
--override log.flush.interval.ms=200 \
--override num.partitions=3 \
--override ssl.keystore.location=/broker.keystore.jks \
--override ssl.keystore.password=KafkaTest \
--override ssl.truststore.location=/broker.truststore.jks \
--override ssl.truststore.password=KafkaTest &

wait_for_port 9092

Expand Down