Skip to content

Commit

Permalink
feat: Add Kafka SASL auth
Browse files Browse the repository at this point in the history
Add `security_protocol`, `sasl_mechanism`, `sasl_plain_username` and
`sasl_plain_password` arguments like aiokafka.AIOKafkaConsumer client.

Support SASL_PLAINTEXT and SASL_SSL security protocol.

Support and test SASL PLAIN mechanism with both PLAINTEXT and SSL connections.
A self-signed certificate is used for SSL connections.

Signed-off-by: Julien Riou <[email protected]>
  • Loading branch information
jouir committed Oct 25, 2023
1 parent ad268aa commit a256803
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ tests/output
.pytest_cache/
.tox
.ruff*
tests/integration/event_source_kafka/*.crt
tests/integration/event_source_kafka/*.key
tests/integration/event_source_kafka/*.csr
tests/integration/event_source_kafka/*.jks

# Ide's
.vscode
Expand Down
26 changes: 21 additions & 5 deletions extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
group_id: A kafka group id
offset: Where to automatically reset the offset. [latest, earliest]
Default to latest
security_protocol: Protocol used to communicate with brokers. [PLAINTEXT, SSL,
SASL_PLAINTEXT, SASL_SSL]. Default to PLAINTEXT
sasl_mechanism: Authentication mechanism when security_protocol is configured.
[PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER].
Default to PLAIN.
sasl_plain_username: Username for SASL PLAIN authentication
sasl_plain_password: Password for SASL PLAIN authentication
Expand Down Expand Up @@ -51,19 +58,25 @@ async def main( # pylint: disable=R0914
group_id = args.get("group_id", None)
offset = args.get("offset", "latest")
encoding = args.get("encoding", "utf-8")
security_protocol = args.get("security_protocol", "PLAINTEXT")
sasl_mechanism = args.get("sasl_mechanism", "PLAIN")
sasl_plain_username = args.get("sasl_plain_username")
sasl_plain_password = args.get("sasl_plain_password")

if offset not in ("latest", "earliest"):
msg = f"Invalid offset option: {offset}"
raise ValueError(msg)

if cafile:
context = create_ssl_context(
ssl_context = None
if cafile or security_protocol.endswith("SSL"):
security_protocol = security_protocol.replace("PLAINTEXT", "SSL")
ssl_context = create_ssl_context(
cafile=cafile,
certfile=certfile,
keyfile=keyfile,
password=password,
)
context.check_hostname = check_hostname
ssl_context.check_hostname = check_hostname

kafka_consumer = AIOKafkaConsumer(
topic,
Expand All @@ -72,8 +85,11 @@ async def main( # pylint: disable=R0914
enable_auto_commit=True,
max_poll_records=1,
auto_offset_reset=offset,
security_protocol="SSL" if cafile else "PLAINTEXT",
ssl_context=context if cafile else None,
security_protocol=security_protocol,
ssl_context=ssl_context,
sasl_mechanism=sasl_mechanism,
sasl_plain_username=sasl_plain_username,
sasl_plain_password=sasl_plain_password,
)

await kafka_consumer.start()
Expand Down
1 change: 1 addition & 0 deletions tests/integration/event_source_kafka/ansible
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ansible
6 changes: 6 additions & 0 deletions tests/integration/event_source_kafka/broker_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="test"
password="test";
};
Client{};
9 changes: 9 additions & 0 deletions tests/integration/event_source_kafka/certs-clean.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -e
DIR=$(dirname "${BASH_SOURCE[0]}")
rm -f "${DIR}/snakeoil-ca.key" \
"${DIR}/snakeoil-ca.crt" \
"${DIR}/broker.csr" \
"${DIR}/broker-ca-signed.crt" \
"${DIR}/kafka.broker.keystore.jks" \
"${DIR}/kafka.broker.truststore.jks"
47 changes: 47 additions & 0 deletions tests/integration/event_source_kafka/certs-create.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
# Generate self-signed certificate for Kafka broker
# Greatly inspired by https://github.com/ansibleinc/cp-demo/blob/master/scripts/security/certs-create-per-user.sh
set -e

CA_PATH=$(dirname "${BASH_SOURCE[0]}")

# Generate CA
openssl req -new -x509 -keyout snakeoil-ca.key -out snakeoil-ca.crt -days 365 -subj '/CN=snakeoil.ansible.com/OU=TEST/O=ANSIBLE/L=Boston/ST=MA/C=US' -passin pass:ansible -passout pass:ansible

# Create broker keystore
keytool -genkey -noprompt \
-alias broker \
-dname "CN=broker,OU=TEST,O=ANSIBLE,L=Boston,S=MA,C=US" \
-ext "SAN=dns:broker,dns:localhost" \
-keystore kafka.broker.keystore.jks \
-keyalg RSA \
-storepass ansible \
-keypass ansible \
-storetype pkcs12

# Create broker CSR
keytool -keystore kafka.broker.keystore.jks -alias broker -certreq -file broker.csr -storepass ansible -keypass ansible -ext "SAN=dns:broker,dns:localhost"

# Sign the host certificate with the certificate authority (CA)
# Set a random serial number (avoid problems from using '-CAcreateserial' when parallelizing certificate generation)
CERT_SERIAL=$(awk -v seed="$RANDOM" 'BEGIN { srand(seed); printf("0x%.4x%.4x%.4x%.4x\n", rand()*65535 + 1, rand()*65535 + 1, rand()*65535 + 1, rand()*65535 + 1) }')
openssl x509 -req -CA "${CA_PATH}/snakeoil-ca.crt" -CAkey "${CA_PATH}/snakeoil-ca.key" -in broker.csr -out broker-ca-signed.crt -sha256 -days 365 -set_serial "${CERT_SERIAL}" -passin pass:ansible -extensions v3_req -extfile <(cat <<EOF
[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
CN = broker
[v3_req]
extendedKeyUsage = serverAuth, clientAuth
EOF
)

# Sign and import the CA cert into the keystore
keytool -noprompt -keystore kafka.broker.keystore.jks -alias snakeoil-caroot -import -file "${CA_PATH}/snakeoil-ca.crt" -storepass ansible -keypass ansible

# Sign and import the host certificate into the keystore
keytool -noprompt -keystore kafka.broker.keystore.jks -alias broker -import -file broker-ca-signed.crt -storepass ansible -keypass ansible -ext "SAN=dns:broker,dns:localhost"

# Create truststore and import the CA cert
keytool -noprompt -keystore kafka.broker.truststore.jks -alias snakeoil-caroot -import -file "${CA_PATH}/snakeoil-ca.crt" -storepass ansible -keypass ansible
23 changes: 20 additions & 3 deletions tests/integration/event_source_kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,30 @@ services:
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SASL_ENABLED: "false"

broker:
image: docker.io/confluentinc/cp-kafka:7.0.1
ports:
- "9092:9092"
- "9092:9092" # PLAINTEXT
- "9093:9093" # SSL
- "9094:9094" # SASL_PLAINTEXT
- "9095:9095" # SASL_SSL
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
ZOOKEEPER_SASL_ENABLED: "false"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092,SSL://localhost:9093,SASL_PLAINTEXT://localhost:9094,SASL_SSL://localhost:9095
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/broker_jaas.conf"
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: ansible
KAFKA_SSL_KEY_CREDENTIALS: ansible
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.trustore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: ansible
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Expand All @@ -27,6 +39,11 @@ services:
interval: 5s
timeout: 3s
retries: 6
volumes:
- ./broker_jaas.conf:/etc/kafka/broker_jaas.conf
- ./kafka.broker.truststore.jks:/etc/kafka/secrets/kafka.broker.trustore.jks
- ./kafka.broker.keystore.jks:/etc/kafka/secrets/kafka.broker.keystore.jks
- ./ansible:/etc/kafka/secrets/ansible

wait_for:
image: docker.io/confluentinc/cp-kafka:7.0.1
Expand Down
77 changes: 76 additions & 1 deletion tests/integration/event_source_kafka/test_kafka_rules.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
- name: test kafka source plugin
hosts: localhost
sources:
- ansible.eda.kafka:
- kafka:
topic: kafka-events
host: localhost
port: 9092
Expand All @@ -18,3 +18,78 @@
condition: event.body == "stop"
action:
shutdown:

- name: test kafka source plugin with ssl security protocol
hosts: localhost
sources:
- kafka:
topic: kafka-events
host: localhost
port: 9093
offset: earliest
encoding: ascii
security_protocol: SSL
check_hostname: false
rules:
- name: match kafka event
condition: event.body.name == "some kafka event"
action:
debug:
msg: "Rule fired successfully"

- name: stop
condition: event.body == "stop"
action:
shutdown:

- name: test kafka source plugin with sasl_plaintext security protocol
hosts: localhost
sources:
- kafka:
topic: kafka-events
host: localhost
port: 9094
offset: earliest
encoding: ascii
security_protocol: SASL_PLAINTEXT
sasl_mechanism: PLAIN
sasl_plain_username: test
sasl_plain_password: test
rules:
- name: match kafka event
condition: event.body.name == "some kafka event"
action:
debug:
msg: "Rule fired successfully"

- name: stop
condition: event.body == "stop"
action:
shutdown:


- name: test kafka source plugin with sasl_ssl security protocol
hosts: localhost
sources:
- kafka:
topic: kafka-events
host: localhost
port: 9095
offset: earliest
encoding: ascii
security_protocol: SASL_SSL
sasl_mechanism: PLAIN
sasl_plain_username: test
sasl_plain_password: test
check_hostname: false
rules:
- name: match kafka event
condition: event.body.name == "some kafka event"
action:
debug:
msg: "Rule fired successfully"

- name: stop
condition: event.body == "stop"
action:
shutdown:
11 changes: 10 additions & 1 deletion tests/integration/event_source_kafka/test_kafka_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
from ..utils import TESTS_PATH, CLIRunner


@pytest.fixture()
def kafka_certs():
cwd = os.path.join(TESTS_PATH, "event_source_kafka")
print(cwd)
result = subprocess.run([os.path.join(cwd, "certs-create.sh")], cwd=cwd, check=True)
yield result
subprocess.run([os.path.join(cwd, "certs-clean.sh")], cwd=cwd, check=True)


@pytest.fixture()
def kafka_broker():
cwd = os.path.join(TESTS_PATH, "event_source_kafka")
Expand All @@ -18,7 +27,7 @@ def kafka_broker():


@pytest.fixture()
def kafka_producer(kafka_broker):
def kafka_producer(kafka_certs, kafka_broker):
return KafkaProducer(bootstrap_servers="localhost:9092")


Expand Down
10 changes: 9 additions & 1 deletion tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ class CLIRunner:
base_cmd: str = "ansible-rulebook"
inventory: str = os.path.join(TESTS_PATH, "default_inventory.yml")
rules: Optional[str] = None
sources: Optional[str] = None
sources: Optional[str] = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"..",
"extensions",
"eda",
"plugins",
"event_source",
)
extra_vars: Optional[str] = None
envvars: Optional[str] = None
proc_id: Optional[str] = None
Expand Down

0 comments on commit a256803

Please sign in to comment.