Skip to content

Commit

Permalink
Add SASL/SCRAM support to the Kafka test container (#28971)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Nov 16, 2021
1 parent 08642d0 commit e69573a
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 20 deletions.
14 changes: 0 additions & 14 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package kafka

import (
"errors"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -32,20 +29,9 @@ import (
)

const (
defaultWaitRetry = 1 * time.Second

// NOTE: maxWaitRetry has no effect on mode, as logstash client currently does
// not return ErrTempBulkFailure
defaultMaxWaitRetry = 60 * time.Second

logSelector = "kafka"
)

var (
errNoTopicSet = errors.New("No topic configured")
errNoHosts = errors.New("No hosts configured")
)

func init() {
sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)}

Expand Down
43 changes: 41 additions & 2 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ import (
)

const (
kafkaDefaultHost = "localhost"
kafkaDefaultPort = "9092"
kafkaDefaultHost = "kafka"
kafkaDefaultPort = "9092"
kafkaDefaultSASLPort = "9093"
)

type eventInfo struct {
Expand Down Expand Up @@ -183,6 +184,37 @@ func TestKafkaPublish(t *testing.T) {
"type": "log",
}),
},
{
"publish single event to test topic",
map[string]interface{}{},
testTopic,
single(common.MapStr{
"host": "test-host",
"message": id,
}),
},
{
// Initially I tried rerunning all tests over SASL/SCRAM, but
// that added a full 30sec to the test. Instead most tests run
// in plaintext, and individual tests can switch to SCRAM
// by inserting the config in this example:
"publish single event to test topic over SASL/SCRAM",
map[string]interface{}{
"hosts": []string{getTestSASLKafkaHost()},
"protocol": "https",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.certificate_authorities": []string{
"../../../testing/environments/docker/kafka/certs/ca-cert",
},
"username": "beats",
"password": "KafkaTest",
},
testTopic,
single(common.MapStr{
"host": "test-host",
"message": id,
}),
},
}

defaultConfig := map[string]interface{}{
Expand Down Expand Up @@ -322,6 +354,13 @@ func getTestKafkaHost() string {
)
}

func getTestSASLKafkaHost() string {
return fmt.Sprintf("%v:%v",
getenv("KAFKA_HOST", kafkaDefaultHost),
getenv("KAFKA_SASL_PORT", kafkaDefaultSASLPort),
)
}

func makeConfig(t *testing.T, in map[string]interface{}) *common.Config {
cfg, err := common.NewConfigFrom(in)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions testing/environments/docker/kafka/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && \

ADD run.sh /run.sh
ADD healthcheck.sh /healthcheck.sh
ADD certs/broker.keystore.jks /broker.keystore.jks
ADD certs/client.truststore.jks /broker.truststore.jks

EXPOSE 9092
EXPOSE 9093
EXPOSE 2181

# healthcheck.sh tries to create and delete an empty kafka topic (the topic
Expand Down
35 changes: 35 additions & 0 deletions testing/environments/docker/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kafka test container

This Docker container provides an environment for testing with Kafka. It exposes two ports to the host system, `9092` for `PLAINTEXT` and `9093` for `SASL/SSL` with username `beats` and password `KafkaTest`.

## Certificates

The test environment uses a self-signed SSL certificate in the broker. To connect, clients will need to set `certs/client.truststore.jks` as their trust store.

The files in the `certs` directory were generated with these commands:

```sh
# create the broker's key
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -validity 5000 -keyalg RSA -genkey

What is your first and last name?
[Unknown]: kafka
...

# create a new certificate authority
openssl req -new -x509 -keyout ca-key -out ca-cert -days 5000

# add the CA to the kafka client's trust store
keytool -keystore client.truststore.jks -storepass KafkaTest -alias CARoot -keyalg RSA -import -file ca-cert

# export the server certificate
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -certreq -file broker-cert

# sign it with the CA
openssl x509 -req -CA ca-cert -CAkey ca-key -in broker-cert -out broker-cert-signed -days 5000 -CAcreateserial -passin pass:KafkaTest

# import CA and signed cert back into server keystore
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias CARoot -import -file ca-cert
keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -import -file broker-cert-signed

```
18 changes: 18 additions & 0 deletions testing/environments/docker/kafka/certs/broker-cert
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
MIIC3zCCAccCAQAwajEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93
bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH
VW5rbm93bjEOMAwGA1UEAxMFa2Fma2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQCH8VYN9FMHXjnLUwT0AJDKM0u/jXE0ng1UfWPVQaVI+Eny+vmf1zDm
d/AoqXaYKzVNvyRXCy1BZGaLVA3go1U7+tVjtniuLTmveE07PuX4w9/ukZPKlUxf
KCjYCmh38BeYiJA2inaxScDO2hxHfB2pulsM+l9+q0NMXFe6RSUAKS0pAeY8KLz9
yWg9hfq6JPuPT14HZmyxLn+1SwRbZZ+TQjlAHfZFpu/igg6cif/ez30z5Gqci+2i
VPlwl9peEsaXn5wbuP6J2Uo6dMoGiFyxFdGCWVWP9WDncvfYKJwQs09QdbFLxAst
BYSmOTszUP+h0SohaxpdC4AOcJxs+MwhAgMBAAGgMDAuBgkqhkiG9w0BCQ4xITAf
MB0GA1UdDgQWBBRFzbnwQXp+h4xE233eH3D+KfozxTANBgkqhkiG9w0BAQsFAAOC
AQEAQti4SPU8KfSoeLbLUic7UciVmwO0TZtiG+Y6fCTdRm7SYovg2zXH576ERClf
JQCzUuMH1Fi6k5adhMUxopJrVirZWOANoffe3yY/PUuFPMv5rvjmG7JqRNloNFYC
4Jah/XeITkw3BcwYxvY3lOZeXgBoRI+PwaD4JNHYf9ruc8cxY59lbWGCQOdbWYuk
ex/Y/rdmiv1cZpVAYY3VkdUnISXf4eePz4+hUdyuNGYt8Rh/dCj0D/1Xdo9jguUw
IWihuXNfH5hBzBp2hX49tCa7j8stOQW6+AS+ysUBRseFNnsu9j95PD+ue9GU5ZLR
mQzlkeZcfimH796e6XF81oCDkA==
-----END NEW CERTIFICATE REQUEST-----
18 changes: 18 additions & 0 deletions testing/environments/docker/kafka/certs/broker-cert-signed
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC8zCCAdsCCQC1GCJdAf28SzANBgkqhkiG9w0BAQUFADANMQswCQYDVQQGEwJV
UzAeFw0yMTEwMjEyMDM0MTBaFw0zNTA2MzAyMDM0MTBaMGoxEDAOBgNVBAYTB1Vu
a25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAOBgNV
BAoTB1Vua25vd24xEDAOBgNVBAsTB1Vua25vd24xDjAMBgNVBAMTBWthZmthMIIB
IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAh/FWDfRTB145y1ME9ACQyjNL
v41xNJ4NVH1j1UGlSPhJ8vr5n9cw5nfwKKl2mCs1Tb8kVwstQWRmi1QN4KNVO/rV
Y7Z4ri05r3hNOz7l+MPf7pGTypVMXygo2Apod/AXmIiQNop2sUnAztocR3wdqbpb
DPpffqtDTFxXukUlACktKQHmPCi8/cloPYX6uiT7j09eB2ZssS5/tUsEW2Wfk0I5
QB32Rabv4oIOnIn/3s99M+RqnIvtolT5cJfaXhLGl5+cG7j+idlKOnTKBohcsRXR
gllVj/Vg53L32CicELNPUHWxS8QLLQWEpjk7M1D/odEqIWsaXQuADnCcbPjMIQID
AQABMA0GCSqGSIb3DQEBBQUAA4IBAQCMGbXC2YdC9+jJjUvuEJIQGwpapJ5Dejng
cnvE//+x8A4W9vC7OJUHcML2GGQIrgvYWlmsCEWX1lJtcVIbqkTqq9Sq99htdMfM
ay4fJB/ey005bhcbEP+19342HkmoOUkEg7qGWZhhL05y0m1vxKvKSUX3p+4TyW1Y
AheRbb9j41Ld3E8+COGwqIWpMNfsGjLqWjUIajemFH91Eo2FFvshM/5ly12GZEil
ivmUqSzV7o6ri0V7DZ5NPOSXEbiMQj5FfmImqXbo7JtBqM/H9S2yAPXZBfAloVNv
XvjG0dY8cnYwGL5MSRiZEuJdimptWnMzFXbD8zyRxSIUMpbDcHNf
-----END CERTIFICATE-----
Binary file not shown.
16 changes: 16 additions & 0 deletions testing/environments/docker/kafka/certs/ca-cert
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICljCCAX4CCQD+dvzut8IfyTANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJV
UzAeFw0yMTEwMjEyMDMyMDJaFw0zNTA2MzAyMDMyMDJaMA0xCzAJBgNVBAYTAlVT
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAlR6sKchCTM0qdrjAdWqR
BmSLfHHe+LB43B0T/+3Y1fporzg2eZC1dPCf7TXLzL92NOlJ+JQCsfb160gKTGfb
7+z2jm+vumbYlKVffsD7MjNdW8SDu9hfMa2DyTY742n3R/X8pc4VK0fdlTQx22Zp
aIA+XwD6hHxZQS9PHVNwTFUoPkP4jevcFANwjLUBgy3dPK0iWdVILnaAwEBg82z3
zWRJ7I4Eg6KS+GtwZPovhiHqcJpz7QPrmggCglL8q0YZQrVrYNucRV1sjPAhEfTA
Sh7Z0UVYdx5+jJq7MyslBqzEM0OrmKrldrTHOAo9+cTc1GiKGRBhVei2R2fP2XAC
HQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQA37KiWMR6SZRmlLKV7hP9/9H1cL1FJ
OPa5MKcwh8Q38IRALCF5SlxxOByP8O01ZInkWjR3jJbMc/k4RwxQXfzYDvB4jleU
MyX63qekIsxFdUn+fzt+wA0xb7tOPGVUbM6QI++YH28p8yzSdY/bXrjRweQuVRC0
B+0zMijI1uU6GRME9+e1OLsN5rDzCFEJUra/+UDc23BTOjC6Az00UKpOGv6oAqg8
iuCOeVCRVPtd7mGJK1dGW3WXV3pbsu4EvfXve9qFFV/7d811JNBjnhF5lFN2JGVs
Ka9JebJ8EKWff6Ns14FJ2cOG3tx7KuWcnfTdma/mH4PeGoU1Og5Ln/ea
-----END CERTIFICATE-----
1 change: 1 addition & 0 deletions testing/environments/docker/kafka/certs/ca-cert.srl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
B518225D01FDBC4B
30 changes: 30 additions & 0 deletions testing/environments/docker/kafka/certs/ca-key
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFHzBJBgkqhkiG9w0BBQ0wPDAbBgkqhkiG9w0BBQwwDgQIPiUp46K/yl8CAggA
MB0GCWCGSAFlAwQBKgQQ3vI4jI41do9rQtAyc+JwfwSCBNDDDv3uSHZ/12ACMLyu
vhrz3en62CT4ooeZKjpep4H/s+2pfqfg5bDeQUguNUo2zsy0EKeK2rIBVUG6KWIa
DWvnl0JaM/TrtGg1pMZMIFPowaf1mNcVcvN5IWEgLgoT+SY8lLtmVqIdU1d2F6nV
t+7JEpktXA5ThS0FbBpW0XI+kG9W7Ln9YPjGLbjcPayJQX1yzWJYEUfmm8lJl7cT
h5V94sB2KP9pwAP/SqElt2QK1BhStUVv3ezp6TT82PETi3No0Uh+oWaxltjPCr4z
5MN/tTDJQGc3llDIrZT+umZcgB6DBsc+nXjrlAHWPeuhIcjNJGs3V0xazQs60M6n
ldhcJH453Muwtp40VDkT21plVPwUrwQX/6gIWIHnyvK44sRG4NvmG+4NBA8V4TdK
AKhkhYTYS+sAUDsAFo408OXvpdGy7G2/cZn+r2frLHLxUU2peqRFP7YqLVs2sdez
sFyt6ZMSAh8UZDYK9kpyQMoeYj7Az14kMKIlE0JADsd3Mn8S/QJrrKWQzQhQVz6O
0rpaGnIM3cICgTK7gTlK+lDIbqAmCYnFLQsU9rHIpzVMkx2iYEId+YNbxodHpFPa
MCz6HU8qI9Tv9JIOfJKdE7tvlSnR89usOU/z+NSGqKm1dhYjG1BNI7wk8/mgMxOg
9BAujodmGvFpMPba84+QT/AtTy9YMMi4Z0H7BKHGD7HwSOTx7kP9hMz/sVVnKxfO
8C9gE91D4enrpQXu7J5JU07LCWSNLiZEegbdKvjBz5Cvfj5LPhazTLYuuU0KNIP9
MjrgodrSp1LgESAA7z8qKUyhX2Z6uO0q1Q5OUFgGNEWXSYLplhWrvftPqdV0YAFI
4y794sojVBBnHYo+Lm5gugm4cg6bLk/YY3ScQqPYEUwO1LZSMUoB9ixLHUYY48ND
xbevM9V8vLgb6Q46zTCYPxwYfxNlcWxeQjwbVEaha5n2Sgu0dmrG/+LjrEwYtHY7
zPdTbl28OyvXDqvilXcDQS8ZQBwqkZ00pg9fokElztgVIMp4cbtBTCiqipfNBGJg
ALEu/lFNlGjvv4iwOdx/yhVjFt2Ri3ViTEoTJ3wAh3o4wh/o8wluNb3bMgfKzw61
/WptUvLnqKIGQ0xZtunxG9WHIpc8oTRZMMUgLnoVzJvdU9cONT5GER9WuQbwXmEE
ytIx9tVq4cb3CoJhynrL9cjGp716nBkx534gyu5N21elb8npk1XAHd6AHUViun0J
TnVHPwSSLN7naaiMKS+8KaknAdjvKCIUytLSRpRb4rkoqD/7MlYlMTAPF5IX6/Xj
fVfR8HKWtkvqhAM4lQ57zwGlpXifGM5Vi/Dq8JYcTOIHIzggbhfi+WVwWJ+SJVp3
FQBSvyJ0XFV8piuP6J1PB6zXLioRiUMDbrl0Hmwo4spLswRsZ6D/6QuNUeNN2Lh1
ZqtkAHWnIll1nviSEWPxiu0lA9ZwfPP1t+H0UkVi8JBUCrTh0gyr2e/CGZAd1GoP
/LnvaRntmqytavI65NlPPlvF9S7enjeEkxvtqhAIuU9nTMORnmpXX+xStfm/AtQp
2UNklwWW6bwPhMF9w+FnuJoK7mrQ5DphsZNcTly1RQ0uQkT6yrzWK5MNmLRiNOez
OmM968GQKexUL9r0BmFi7T00rQ==
-----END ENCRYPTED PRIVATE KEY-----
Binary file not shown.
26 changes: 22 additions & 4 deletions testing/environments/docker/kafka/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,31 @@ echo "Starting ZooKeeper"
${KAFKA_HOME}/bin/zookeeper-server-start.sh ${KAFKA_HOME}/config/zookeeper.properties &
wait_for_port 2181

# create a user beats with password KafkaTest, for use in client SASL authentication
/kafka/bin/kafka-configs.sh \
--zookeeper localhost:2181 \
--alter --add-config 'SCRAM-SHA-512=[password=KafkaTest]' \
--entity-type users \
--entity-name beats

echo "Starting Kafka broker"
mkdir -p ${KAFKA_LOGS_DIR}
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \
--override delete.topic.enable=true --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \
--override listeners=PLAINTEXT://0.0.0.0:9092 \
--override logs.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200 \
--override num.partitions=3 &
--override delete.topic.enable=true \
--override advertised.host.name=${KAFKA_ADVERTISED_HOST} \
--override listeners=PLAINTEXT://0.0.0.0:9092,SASL_SSL://0.0.0.0:9093 \
--override advertised.listeners=PLAINTEXT://${KAFKA_ADVERTISED_HOST}:9092,SASL_SSL://${KAFKA_ADVERTISED_HOST}:9093 \
--override inter.broker.listener.name=PLAINTEXT \
--override sasl.enabled.mechanisms=SCRAM-SHA-512 \
--override listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required;" \
--override logs.dir=${KAFKA_LOGS_DIR} \
--override log4j.logger.kafka=DEBUG,kafkaAppender \
--override log.flush.interval.ms=200 \
--override num.partitions=3 \
--override ssl.keystore.location=/broker.keystore.jks \
--override ssl.keystore.password=KafkaTest \
--override ssl.truststore.location=/broker.truststore.jks \
--override ssl.truststore.password=KafkaTest &

wait_for_port 9092

Expand Down

0 comments on commit e69573a

Please sign in to comment.