Those connection type
exposes Kafka in the external listener configuration:
-
nodeport
uses a NodePort type Service -
loadbalancer
uses a Loadbalancer type Service -
ingress
uses Kubernetes Ingress and the Ingress NGINX Controller for Kubernetes -
route
uses OpenShift Routes and the HAProxy router (the simplest method albeit not the most efficient)
Here we’ll use route
and scram-sha-512
for authentication:
-
Add external listener by adding this section to the existing Kafka resource:
- name: external tls: true type: route port: 9094 authentication: type: scram-sha-512
Otherwise patch it with the following command:
oc patch kafka my-cluster --patch '{"spec":{"kafka": { "listeners": [{"authentication":{"type":"scram-sha-512"},"name":"external","port":9094,"tls":true,"type":"route"} ] } }}' --type=merge
-
Define a new user:
oc apply -f k8s/30-user.yaml
-
The client must trust the Kafka CA certificate to establish the encrypted connection. The following command extract the CA certificate and create a truststore for the Java clients:
oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.name=="external")].certificates[0]}{"\n"}' > kafka-cluster-ca.crt keytool -import -trustcacerts -alias root -file kafka-cluster-ca.crt -keystore truststore.jks -storepass password -noprompt
-
Configure the kafka consumer to run from your local environment and connect to the Kafka cluster on OpenShift. The following script configures the truststore, the security protocol, the authentication mechanism and the credentials which are extracted from the secret:
echo -e "\n\n## Kafka remote config kafka.ssl.truststore.location = ../truststore.jks kafka.ssl.truststore.password = password kafka.security.protocol=SASL_SSL kafka.sasl.mechanism=SCRAM-SHA-512" >> kafka-consumer/src/main/resources/application.properties echo "kafka.sasl.jaas.config=$(oc get secret my-user -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d)" >> kafka-consumer/src/main/resources/application.properties oc get kafka my-cluster -o jsonpath='{"\nkafka.bootstrap.servers="}{.status.listeners[?(@.name=="external")].bootstrapServers}{"\n"}' >> kafka-consumer/src/main/resources/application.properties
NoteThe properties are added to the application.properties
managed by Quarkus. By convention, all properties prefixed withkafka
are passed to the Kafka client API configuration. -
Start locally the kafka consumer:
mvn -f kafka-consumer/pom.xml package quarkus:dev
Alternatively, you can use the console consumer shipped by the Kafka project.
Use the previous step command to create the client.config
file (drop kafka.
prefix).
Then issue the following command:
bin/kafka-console-consumer.sh \
--bootstrap-servers (oc get kafka my-cluster -o jsonpath='{.status.listeners[?(@.name=="external")].bootstrapServers}')
--topic event \
--consumer.config client.config
You can grant different permissions to your users.
There are 4 options:
-
Simple authorization
-
OAuth 2.0 authorization using Red Hat SSO
-
Open Policy Agent (OPA) authorization
-
Custom authorization
Tip
|
To ensure a clean environment, stop any running client: oc scale deployment kafka-producer --replicas=0
oc scale deployment kafka-consumer --replicas=0 |
In this tutorial, you will configure the simple authorization:
-
Enable the authorization in the Kafka resource:
oc edit kafka my-cluster
Under
kafka
addauthorization
and the the authentication for each listener:### kafka section authorization: type: simple ### for each listener add: authentication: type: scram-sha-512
WarningWhen you enable the authorization all your listener have to authenticate their clients. In fact, authorization acts cluster wide, so you cannot accept anymore anonymous interactions. -
Edit the
kafkauser
resourceoc edit kafkausers my-user
Add the following ACLs:
# at spec level: authorization: type: simple acls: - resource: name: event patternType: literal type: topic operations: - Read - Describe - Write - IdempotentWrite - resource: name: '*' patternType: literal type: group operations: - Read - Write
The previous definition grants the main operations (read, write, etc) to the
event
topic and to all consumer groups. -
Wait a few minutes while the operator completes the rolling update of the Kafka brokers and the entity operator, then you can start the local consumer to check that it is still working properly, inspect the logs to spot potential error messages.
mvn -f kafka-consumer/pom.xml package quarkus:dev
-
To prove that ACL can block your user from reading, remove the
Read
operation and and execute again the the local consumer (check previous step). -
Stop the consumer and add back the
Read
operation in the ACL.
When the authorization is enabled, the Kafka client inside OCP require the proper configuration to provide the correct credential when they initiate the broker conversation.
-
Open the configmap with the producer environment variable and add the following:
KAFKA_SASL_JAAS_CONFIG: |- org.apache.kafka.common.security.scram.ScramLoginModule required username="my-user" password="<PASSWORD>"; KAFKA_SASL_MECHANISM: SCRAM-SHA-512 KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
Replace
<PASSWORD>
with the outcomes of:oc get secret my-user -o jsonpath='{.data.password}' | base64 -d
-
Issue the following commands to start the Kafka producer application, inspect the logs, and spot potential error messages:
oc scale deployment/kafka-producer --replicas=1 oc logs --tail=20 -f --selector="app.kubernetes.io/name=kafka-producer"
oc scale deployment/kafka-producer --replicas=0
oc apply -f k8s/01-kafka-metrics.yaml
oc delete kafkauser my-user
oc delete configmap kafka-consumer-config
oc apply --force -f kafka-consumer/src/main/kubernetes/openshift.yml
oc delete configmap kafka-producer-config
oc apply --force -f kafka-producer/src/main/kubernetes/openshift.yml