From e69573a2d542faf0ae4dcb45b0e9bc5c8f362422 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 16 Nov 2021 18:56:23 -0500 Subject: [PATCH] Add SASL/SCRAM support to the Kafka test container (#28971) --- libbeat/outputs/kafka/kafka.go | 14 ------ .../outputs/kafka/kafka_integration_test.go | 43 +++++++++++++++++- testing/environments/docker/kafka/Dockerfile | 3 ++ testing/environments/docker/kafka/README.md | 35 ++++++++++++++ .../docker/kafka/certs/broker-cert | 18 ++++++++ .../docker/kafka/certs/broker-cert-signed | 18 ++++++++ .../docker/kafka/certs/broker.keystore.jks | Bin 0 -> 3959 bytes .../environments/docker/kafka/certs/ca-cert | 16 +++++++ .../docker/kafka/certs/ca-cert.srl | 1 + .../environments/docker/kafka/certs/ca-key | 30 ++++++++++++ .../docker/kafka/certs/client.truststore.jks | Bin 0 -> 962 bytes testing/environments/docker/kafka/run.sh | 26 +++++++++-- 12 files changed, 184 insertions(+), 20 deletions(-) create mode 100644 testing/environments/docker/kafka/README.md create mode 100644 testing/environments/docker/kafka/certs/broker-cert create mode 100644 testing/environments/docker/kafka/certs/broker-cert-signed create mode 100644 testing/environments/docker/kafka/certs/broker.keystore.jks create mode 100644 testing/environments/docker/kafka/certs/ca-cert create mode 100644 testing/environments/docker/kafka/certs/ca-cert.srl create mode 100644 testing/environments/docker/kafka/certs/ca-key create mode 100644 testing/environments/docker/kafka/certs/client.truststore.jks diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 9be3970b1c4..8f06398eb0c 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -18,9 +18,6 @@ package kafka import ( - "errors" - "time" - "github.com/Shopify/sarama" "github.com/elastic/beats/v7/libbeat/beat" @@ -32,20 +29,9 @@ import ( ) const ( - defaultWaitRetry = 1 * time.Second - - // NOTE: maxWaitRetry has no effect on mode, as logstash client currently does - // not return ErrTempBulkFailure - defaultMaxWaitRetry = 60 * time.Second - logSelector = "kafka" ) -var ( - errNoTopicSet = errors.New("No topic configured") - errNoHosts = errors.New("No hosts configured") -) - func init() { sarama.Logger = kafkaLogger{log: logp.NewLogger(logSelector)} diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 0cc751d99b9..2be42f639e7 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -45,8 +45,9 @@ import ( ) const ( - kafkaDefaultHost = "localhost" - kafkaDefaultPort = "9092" + kafkaDefaultHost = "kafka" + kafkaDefaultPort = "9092" + kafkaDefaultSASLPort = "9093" ) type eventInfo struct { @@ -183,6 +184,37 @@ func TestKafkaPublish(t *testing.T) { "type": "log", }), }, + { + "publish single event to test topic", + map[string]interface{}{}, + testTopic, + single(common.MapStr{ + "host": "test-host", + "message": id, + }), + }, + { + // Initially I tried rerunning all tests over SASL/SCRAM, but + // that added a full 30sec to the test. Instead most tests run + // in plaintext, and individual tests can switch to SCRAM + // by inserting the config in this example: + "publish single event to test topic over SASL/SCRAM", + map[string]interface{}{ + "hosts": []string{getTestSASLKafkaHost()}, + "protocol": "https", + "sasl.mechanism": "SCRAM-SHA-512", + "ssl.certificate_authorities": []string{ + "../../../testing/environments/docker/kafka/certs/ca-cert", + }, + "username": "beats", + "password": "KafkaTest", + }, + testTopic, + single(common.MapStr{ + "host": "test-host", + "message": id, + }), + }, } defaultConfig := map[string]interface{}{ @@ -322,6 +354,13 @@ func getTestKafkaHost() string { ) } +func getTestSASLKafkaHost() string { + return fmt.Sprintf("%v:%v", + getenv("KAFKA_HOST", kafkaDefaultHost), + getenv("KAFKA_SASL_PORT", kafkaDefaultSASLPort), + ) +} + func makeConfig(t *testing.T, in map[string]interface{}) *common.Config { cfg, err := common.NewConfigFrom(in) if err != nil { diff --git a/testing/environments/docker/kafka/Dockerfile b/testing/environments/docker/kafka/Dockerfile index ff38db49e39..1a5e58836bc 100644 --- a/testing/environments/docker/kafka/Dockerfile +++ b/testing/environments/docker/kafka/Dockerfile @@ -18,8 +18,11 @@ RUN mkdir -p ${KAFKA_LOGS_DIR} && mkdir -p ${KAFKA_HOME} && \ ADD run.sh /run.sh ADD healthcheck.sh /healthcheck.sh +ADD certs/broker.keystore.jks /broker.keystore.jks +ADD certs/client.truststore.jks /broker.truststore.jks EXPOSE 9092 +EXPOSE 9093 EXPOSE 2181 # healthcheck.sh tries to create and delete an empty kafka topic (the topic diff --git a/testing/environments/docker/kafka/README.md b/testing/environments/docker/kafka/README.md new file mode 100644 index 00000000000..6a7306e2423 --- /dev/null +++ b/testing/environments/docker/kafka/README.md @@ -0,0 +1,35 @@ +# Kafka test container + +This Docker container provides an environment for testing with Kafka. It exposes two ports to the host system, `9092` for `PLAINTEXT` and `9093` for `SASL/SSL` with username `beats` and password `KafkaTest`. + +## Certificates + +The test environment uses a self-signed SSL certificate in the broker. To connect, clients will need to set `certs/client.truststore.jks` as their trust store. + +The files in the `certs` directory were generated with these commands: + +```sh +# create the broker's key +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -validity 5000 -keyalg RSA -genkey + +What is your first and last name? + [Unknown]: kafka + ... + +# create a new certificate authority +openssl req -new -x509 -keyout ca-key -out ca-cert -days 5000 + +# add the CA to the kafka client's trust store +keytool -keystore client.truststore.jks -storepass KafkaTest -alias CARoot -keyalg RSA -import -file ca-cert + +# export the server certificate +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -certreq -file broker-cert + +# sign it with the CA +openssl x509 -req -CA ca-cert -CAkey ca-key -in broker-cert -out broker-cert-signed -days 5000 -CAcreateserial -passin pass:KafkaTest + +# import CA and signed cert back into server keystore +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias CARoot -import -file ca-cert +keytool -keystore broker.keystore.jks -storepass KafkaTest -alias broker -import -file broker-cert-signed + +``` diff --git a/testing/environments/docker/kafka/certs/broker-cert b/testing/environments/docker/kafka/certs/broker-cert new file mode 100644 index 00000000000..3a7d9e2498a --- /dev/null +++ b/testing/environments/docker/kafka/certs/broker-cert @@ -0,0 +1,18 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIC3zCCAccCAQAwajEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93 +bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMH +VW5rbm93bjEOMAwGA1UEAxMFa2Fma2EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQCH8VYN9FMHXjnLUwT0AJDKM0u/jXE0ng1UfWPVQaVI+Eny+vmf1zDm +d/AoqXaYKzVNvyRXCy1BZGaLVA3go1U7+tVjtniuLTmveE07PuX4w9/ukZPKlUxf +KCjYCmh38BeYiJA2inaxScDO2hxHfB2pulsM+l9+q0NMXFe6RSUAKS0pAeY8KLz9 +yWg9hfq6JPuPT14HZmyxLn+1SwRbZZ+TQjlAHfZFpu/igg6cif/ez30z5Gqci+2i +VPlwl9peEsaXn5wbuP6J2Uo6dMoGiFyxFdGCWVWP9WDncvfYKJwQs09QdbFLxAst +BYSmOTszUP+h0SohaxpdC4AOcJxs+MwhAgMBAAGgMDAuBgkqhkiG9w0BCQ4xITAf +MB0GA1UdDgQWBBRFzbnwQXp+h4xE233eH3D+KfozxTANBgkqhkiG9w0BAQsFAAOC +AQEAQti4SPU8KfSoeLbLUic7UciVmwO0TZtiG+Y6fCTdRm7SYovg2zXH576ERClf +JQCzUuMH1Fi6k5adhMUxopJrVirZWOANoffe3yY/PUuFPMv5rvjmG7JqRNloNFYC +4Jah/XeITkw3BcwYxvY3lOZeXgBoRI+PwaD4JNHYf9ruc8cxY59lbWGCQOdbWYuk +ex/Y/rdmiv1cZpVAYY3VkdUnISXf4eePz4+hUdyuNGYt8Rh/dCj0D/1Xdo9jguUw +IWihuXNfH5hBzBp2hX49tCa7j8stOQW6+AS+ysUBRseFNnsu9j95PD+ue9GU5ZLR +mQzlkeZcfimH796e6XF81oCDkA== +-----END NEW CERTIFICATE REQUEST----- diff --git a/testing/environments/docker/kafka/certs/broker-cert-signed b/testing/environments/docker/kafka/certs/broker-cert-signed new file mode 100644 index 00000000000..b023f3c146d --- /dev/null +++ b/testing/environments/docker/kafka/certs/broker-cert-signed @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8zCCAdsCCQC1GCJdAf28SzANBgkqhkiG9w0BAQUFADANMQswCQYDVQQGEwJV +UzAeFw0yMTEwMjEyMDM0MTBaFw0zNTA2MzAyMDM0MTBaMGoxEDAOBgNVBAYTB1Vu +a25vd24xEDAOBgNVBAgTB1Vua25vd24xEDAOBgNVBAcTB1Vua25vd24xEDAOBgNV +BAoTB1Vua25vd24xEDAOBgNVBAsTB1Vua25vd24xDjAMBgNVBAMTBWthZmthMIIB +IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAh/FWDfRTB145y1ME9ACQyjNL +v41xNJ4NVH1j1UGlSPhJ8vr5n9cw5nfwKKl2mCs1Tb8kVwstQWRmi1QN4KNVO/rV +Y7Z4ri05r3hNOz7l+MPf7pGTypVMXygo2Apod/AXmIiQNop2sUnAztocR3wdqbpb +DPpffqtDTFxXukUlACktKQHmPCi8/cloPYX6uiT7j09eB2ZssS5/tUsEW2Wfk0I5 +QB32Rabv4oIOnIn/3s99M+RqnIvtolT5cJfaXhLGl5+cG7j+idlKOnTKBohcsRXR +gllVj/Vg53L32CicELNPUHWxS8QLLQWEpjk7M1D/odEqIWsaXQuADnCcbPjMIQID +AQABMA0GCSqGSIb3DQEBBQUAA4IBAQCMGbXC2YdC9+jJjUvuEJIQGwpapJ5Dejng +cnvE//+x8A4W9vC7OJUHcML2GGQIrgvYWlmsCEWX1lJtcVIbqkTqq9Sq99htdMfM +ay4fJB/ey005bhcbEP+19342HkmoOUkEg7qGWZhhL05y0m1vxKvKSUX3p+4TyW1Y +AheRbb9j41Ld3E8+COGwqIWpMNfsGjLqWjUIajemFH91Eo2FFvshM/5ly12GZEil +ivmUqSzV7o6ri0V7DZ5NPOSXEbiMQj5FfmImqXbo7JtBqM/H9S2yAPXZBfAloVNv +XvjG0dY8cnYwGL5MSRiZEuJdimptWnMzFXbD8zyRxSIUMpbDcHNf +-----END CERTIFICATE----- diff --git a/testing/environments/docker/kafka/certs/broker.keystore.jks b/testing/environments/docker/kafka/certs/broker.keystore.jks new file mode 100644 index 0000000000000000000000000000000000000000..aa03364e3ac6ee205cc523ec676f66f0b953dee5 GIT binary patch literal 3959 zcmY+EcQ71|w#TjA)vfZ=yF{<6EFx-@T_rlv%PPTQ^&Vx39tqI}(M2b^Xh9YcEjmH8 z5G6!!*_-#~y?f`*nKNhR^F8PM`GI06eDDE4D276afLI^~5pzKfAOyU{P_Tk96pVi{ zEffQG`L76U4#I#<{$hi_twTWie^+EA0Q|QY&^T(8?fdxmT z`tAhTbd4et>h(Pw!3W~?>wz$!q1~GI;_FO;^f={LaJq|*nW$D8{^c;vg(RxWIZ%}& zq)ldNwNLoYY@->jD2=UV=wpjv-we%r*k|gr2vBIW;=avzz<^pGuvxFaX z+I{FdYqxY*66bE?G9cbpK#J!2NtXjR^CYkS)_-E>sJgPF4(86lI3lwHOPgN-!qH$BbL464k5K7YC~ZR*hc1 zN)*S|r+qS|-0#k+X@BIdqD(-!>2BlG4rgm@&#qq;_{JSveV}zvDUY7~=l!A#y8>gi z(T|kA{F`o=ZnBt6?wjiF)Ixyq-n&Egd=mV=Sk|;(j@0(HRopQlCcDpSz|K2|?l6)z ze3Kc9&mDZm?$WL0_lBheRKg^vI(dhrrSu-rawo-)%+^BM5r}8~XE$DA(ufAHN1oee zr4_=a%f@T>hq-qN1z9W*&C@7KadJs%H1MiPUTq~d%57J!ZK6G!tL>L=655rk%<;V2ez71MiwJ77-fVjB8e;fr zK%27QR_F&BH}?p~ve3N;LNS8dR02EWn--@%j|Y|Mc*XpiIpG_7M|@ky_HutH84o{) z*SnK$Q8j$>A7172U>RmX550N3?ZIW~#4NMGR;8&ei{?p<=9MsA2h1u)Ooo2X%O#w; z^P5u4+LO%SZ>3kJ9+jMS;FvOItsOdAF;TteF84>24)Y}OM#f?c@$gSG_R4aniOrB z7!&T%wlaou;mAK@4--^dSulDT%rXtxxn^GJx6)ubGWA~bWS`@;IjH*lsj<21Lkx`f&3n%Q* zj)2X{5(Z=a(G=8fzBMS^U7duW(iJQbZxt!&g9z#nwscz`mz^@ZYx|(b2&O$w+7rNB zTb3Pq2m{`s&>>B=uMQq0)uyL3Jpb92URPX#lKY!?K$n3g??O$+ zkLj=n9Aj~M)8K?ZQLhfABmJ+m9u#w`pE~HewUjY-+_^A`q9bFofhD~RdtcJ*cOnN`DoP=E927d;^pU8n5c!oH%4`2!BBSlo(=k$hvW=YJ((qca zC@9yfXW@ky3KqgqxBltuf0(B!nMzG6yH z=6|b_m_`i3MuvyP^TtEtq3|5>yrG=`32DV>K-5Mq?v5N{QWBCfe@jYQN?HPnAujp% z5+T7`3~~Bjl!6by`#Tc;lK}q9umt~OSf>+iwVFLzfg#p| zdivIFD>{DSR1IeJiUhJKn^ld@ZGy-enW7vQ=lj{)H?7>Trxsaj#;J;eL;Nnqw(x41`~cVRxK2u$6snp z!vmF^0D_cgO2|!dw6rA3NL*Xy^<4w6LD^n@ua5+0n$f&^ zNtEAEHM0XT7`0|EOJUc=*%CJWsBM0^wOcybb+7KeiJkf^y+JyHjPd) zm;tZ1lwDS}b1L`GbEz3dm)R(uwJrfxy#UoP%oGvlw`V7>!tKeMX7=cElz5ZwXL(67 z28#Vt7FWAs^iXr24ycVSs*KXgWN-})mUrNIHZ$ATL(Cn5#ceu|u#l0k;^yzKt2{O@ z*i4$k15AFSB;!9j4z1ZRuZ3)!%fB5C&Xr(^mphP(RQqb@CJ(LJSg>V2@vVftDKU3c zdDxP@LsqSTK1>)sVF6nG|A5=@uOoHberOu6o7; zxgRRM*2ubynYH){e5}e6RJP?=|}0UcW# zxvuDxI=8blWN{M}C9bdjcB-Q0TfU2ZO|Hj=I% z=rVN&XxhB5X!{~YM$_aj3YJ?8v|grRXF@aUaDY zLknZ1o@y{*9wzBkhWFd@bsUn(V@`4@-yVoV1I$ z+i}?E%RhqIXzE%B5$@R;fttUA3I;j}W7gpl?n~DTir|d7O^sur3{Ey8 z;zlQQ#rkX}M{8J{N-!@T?R8B1Ej1>aW2Mm|i}o`W-c!?=OH1G%eQu5q6lL;?4<4G5 zP!x3bKii_E0Ux~9H3&PR@NCH}6*GJT7gn2Z3(Rs0ks}pTDt{5w{X9Il9u0)EnIj8G z$st_~ig}Hca=(zXo&gr>QhW_@g@z9_6(*!eB)T%SE_;F1wx!ldd9;<0IZ7wy%Y}JT7NO?O%+&R%|dCZZ1?it!*kX=3kwXTx*l;_mx`e z{1!Com1s$~tRyi($xi3yN3a|#=&2(8q|h1Q6#GS568-9TmP_84JXhmHTrb7ehqT=4 zrdc=fK(eS5zLTjEQZwB^RAV9QKg5rgBUlwwvczy2DzzU3UxQAH2!-K1W`_etuEtL5|J)4tS*;cgnfYlAlPj0(?uYvh=`kuTW zx)8mu98q0AE}Dh8&t%ZSM&r(`qixZtf5**{D2>QbRcONoxuF&yfqJ)ad~UwE0*SFF zPxI97qXfhIbbAFo9UVPL8wk9vpEH-b>5@lr2~}EwrMhTq6~r zuC3Rst2Mlp#kJ@!@DlFg=qtd;@tTVYmT?oX#>1|Tq40t*@E}Gxax);TyruYXE2?bj z(VK00mRPDbtt#~Rm?Be~0cBjLq0)xF3c0lSqJ=MymI)#rItCMq*NFyfG3a1RBzyGi zVv-|Om%(-JeQ9$Aj{>agBQ-y%O4M$BB!7pqF+Aq)J4IFj5%FpB{C5=a=?UEjxT-~c zU;1C^tEWkdmXF9K58eDJ1T8J)hpT(Tmi#d;140@hMjpcC1QJy_0ndjY?NY&)M&OWN zryXTo+Sz_xg*YzO94_&<;4q&)5Ikc+6p;JE7W;xSy|*(zxs^STe)=ga&^atE9W3qW zBVHiK9imG!fVH0An}*DhTcKdm1gbP?gx3i^L1)fR+QxW$LIAs^HvZ)O`X$?PG4m?o zlZPB7h?|)khN1_I=YZe=4eIqOwr_6U!UKtSWo$DVJ@r9|F}BuBujRTd;{)X`*S0{I(9(3bH5}zTNDolGEGOD7 zKc%u=-1z>*#nqE=Ay(Yyd5cu>O^C(CZVm8~2cEGecBYlwh?qH!tW@mKTw?a6ruC>Y z0NM_b>Q2->W7O`xjK?bip9?c0lPxW7ZbmW;>ekUWImrz}FCgsbWW8YF`2uq$UC)X zLKB){fa40gqkpc17~w&^uWmonTBJKvXUOy7j-LDgJ>Re~&sP4FkLBt-X*H>c2*LZi zp8VoCndDn+h-v(nKLI+VK0IAZlU6b!#ICa}gHJ&OL&g5fSLdCesv~^4pmC)7jw#w7 z;uoiQn#_4(VeFPT=^3PZ!*#p$GT*Ga<~};i!}m;nFkFa23LbRH4^@D2LWv2$f)F4+ z6BrLf6SeqA)=2hV(QpJ;cw=Z9^+;-^HHgLvOWJX&AS4Bm?7)(R2t`W-&gw8(raW-Buc&2R zo>Zfp2n@^5C%th@uPM3yQZ0^A&Lzob`S#@*gCJEmTdCSY|F|z4e%<^6+qi`t&GZom z*c(7GpesOvpqasBmRYCBf?+O>gtO7nnbr~q{4YJ3$8kgav{N4FdfS2k~FM`(9d zSJNj=b7jShY0h;X{=b1Z-RycKB}Mav`$2_}9L0FiM$1i1OLq6%YrZmZ9d&CnPN91K zwC^xIYcqLao>RZ|5YSwQa09HJ_b!14^KyE-Wl#N(obsr2%`qyV7g$TAi^oT{%i5Jo z@a#1f5~1oaqV~!niva#2oGBXytybf`Ah#H*A0#3GxU)nvqHmuhigy(f)lRt%$zV7f zFEBpQTuUD=riHUDhu^>R{cjCr_0hUbA3|j&bP(jM%J>TcasX%|o{`)jT-A~cH)fVf zbY__-v`HZe_$+~Nnewyk7H3FlU6gHM3+Q@Qn*Wu975-@2&o;S&E}8 zl=0duyg>-KJyMyBYjU61I>+p6eF+WoWv5U&{J>f5;3GHrNAP|+XAEHBYYm{Av&4Aix^YSsnXWLc0RR?<4?Fb=Bv|EM*${em7x=9bwtVCZ%!*gLDULftoe~64S5rOTol-0|#`# z|NWdCi}8qyXmM;nu?dU&SCpmIx34kUNSh8Uqm#ye7Y}+-lcs7Cw+l4N8~~2Ed&V96 z*))#Y?^8NaNpnVVCv`so6mlu2xf@(Z1iy4Fa79r4Dc)^{y(cka|lG3Js2|t!!S1A}Is^J^N zRP9166@8|=It-c=+($EL4V8Nybud0KAutIB1uG5%0vZJX1Qf(d7W0m(qqVf$Q<7It kl2zELI3@%XZh(h<)Pz_Z+=jYfu|-Rig@)#*0s{etpj~>g2><{9 literal 0 HcmV?d00001 diff --git a/testing/environments/docker/kafka/run.sh b/testing/environments/docker/kafka/run.sh index 873f6951acc..bfacf2a7242 100755 --- a/testing/environments/docker/kafka/run.sh +++ b/testing/environments/docker/kafka/run.sh @@ -16,13 +16,31 @@ echo "Starting ZooKeeper" ${KAFKA_HOME}/bin/zookeeper-server-start.sh ${KAFKA_HOME}/config/zookeeper.properties & wait_for_port 2181 +# create a user beats with password KafkaTest, for use in client SASL authentication +/kafka/bin/kafka-configs.sh \ + --zookeeper localhost:2181 \ + --alter --add-config 'SCRAM-SHA-512=[password=KafkaTest]' \ + --entity-type users \ + --entity-name beats + echo "Starting Kafka broker" mkdir -p ${KAFKA_LOGS_DIR} ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \ - --override delete.topic.enable=true --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \ - --override listeners=PLAINTEXT://0.0.0.0:9092 \ - --override logs.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200 \ - --override num.partitions=3 & + --override delete.topic.enable=true \ + --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \ + --override listeners=PLAINTEXT://0.0.0.0:9092,SASL_SSL://0.0.0.0:9093 \ + --override advertised.listeners=PLAINTEXT://${KAFKA_ADVERTISED_HOST}:9092,SASL_SSL://${KAFKA_ADVERTISED_HOST}:9093 \ + --override inter.broker.listener.name=PLAINTEXT \ + --override sasl.enabled.mechanisms=SCRAM-SHA-512 \ + --override listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required;" \ + --override logs.dir=${KAFKA_LOGS_DIR} \ + --override log4j.logger.kafka=DEBUG,kafkaAppender \ + --override log.flush.interval.ms=200 \ + --override num.partitions=3 \ + --override ssl.keystore.location=/broker.keystore.jks \ + --override ssl.keystore.password=KafkaTest \ + --override ssl.truststore.location=/broker.truststore.jks \ + --override ssl.truststore.password=KafkaTest & wait_for_port 9092