Skip to content

Commit

Permalink
Add integration tests for streams test
Browse files Browse the repository at this point in the history
  • Loading branch information
uds5501 committed Nov 20, 2024
1 parent e9d720c commit 6961914
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 122 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
All notable changes to this project will be documented in this file. This change log follows the conventions
of [keepachangelog.com](http://keepachangelog.com/).

## 4.12.0
- Adds support for ACL auth for kafka streams.

## 4.11.1
- Fix retry-count returning nil if empty. Returns 0 by default now.

Expand Down
124 changes: 2 additions & 122 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
(ns ziggurat.streams-test
(:require [clojure.java.shell :refer [sh]]
[clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
(:require [clojure.test :refer [deftest is join-fixtures testing use-fixtures]]
[mount.core :as mount]
[protobuf.core :as proto]
[ziggurat.config :refer [ziggurat-config]]
Expand All @@ -11,29 +10,18 @@
[ziggurat.streams :refer [add-stream-thread get-stream-thread-count remove-stream-thread start-stream start-streams stop-stream stop-streams]]
[ziggurat.streams :refer [handle-uncaught-exception start-stream start-streams stop-stream stop-streams]])
(:import (com.gojek.test.proto Example$Photo)
(java.net URI)
(java.nio.file Files Paths)
(java.nio.file.attribute FileAttribute)
(java.util Properties)
(kafka.server KafkaConfig KafkaServer)
(org.apache.kafka.clients CommonClientConfigs)
(org.apache.kafka.clients.producer ProducerConfig)
(org.apache.kafka.common.config SaslConfigs)
(org.apache.kafka.common.utils MockTime)
(org.apache.kafka.streams KafkaStreams$State)
(org.apache.kafka.streams KeyValue)
(org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse)
(org.apache.kafka.streams.integration.utils IntegrationTestUtils)
(scala Some)))
(org.apache.kafka.streams.integration.utils IntegrationTestUtils)))

(use-fixtures :once (join-fixtures [fix/mount-config-with-tracer
fix/silence-logging
fix/mount-metrics]))

(def truststore-path "/tmp/truststore/kafka.server.truststore.jks")
(def saas-config-path "test/ziggurat/kafka_server_jaas.conf")
(def truststore-password "testpassword")

(defn- start-mount
[]
(mount/start-with-states [[#'ziggurat.messaging.consumer/consumers {:start (constantly nil)
Expand All @@ -47,91 +35,11 @@
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.ByteArraySerializer")))

(defn- props-with-sasl-config []
(doto (props)
(.put ProducerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9093")
(.put CommonClientConfigs/SECURITY_PROTOCOL_CONFIG "SASL_SSL")
(.put SaslConfigs/SASL_MECHANISM "OAUTHBEARER")
(.put SaslConfigs/SASL_LOGIN_CALLBACK_HANDLER_CLASS "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler")
(.put props SaslConfigs/SASL_JAAS_CONFIG
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
(.put props "ssl.truststore.location" truststore-path)
(.put props "ssl.truststore.password" truststore-password)
(.put props "ssl.endpoint.identification.algorithm" "https")))

(defn- props-with-string-serializer []
(doto (props)
(.put ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer")
(.put ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringSerializer")))

(defn create-truststore []
"Generates a new truststore and self-signed certificate for testing."
(let [some-path (Paths/get (URI. (str "file:///" truststore-path)))
dir-path (.getParent some-path)]
;; Ensure directory exists
(Files/createDirectories dir-path (into-array FileAttribute []))
;; Generate truststore with keytool
(sh "keytool" "-genkeypair"
"-alias" "test-cert"
"-keyalg" "RSA"
"-keystore" truststore-path
"-storepass" truststore-password
"-validity" "365"
"-keysize" "2048"
"-dname" "CN=localhost, OU=Test, O=Ziggurat, L=City, S=State, C=US")))

(defn cleanup-truststore []
"Deletes the generated truststore after the test."
(let [path (Paths/get truststore-path nil)]
(Files/deleteIfExists path)))

(defn create-broker-config [port log-dir]
"Manually create a Kafka broker configuration."
(let [props (Properties.)]
(.put props "broker.id" "0")
(.put props "log.dirs" (.toString log-dir))
(.put props "listeners" (str "SASL_SSL://localhost:" port))
(.put props "advertised.listeners" (str "SASL_SSL://localhost:" port))
(.put props "listener.security.protocol.map" "SASL_SSL:SASL_SSL")
(.put props "inter.broker.listener.name" "SASL_SSL")
(.put props "ssl.truststore.location" truststore-path)
(.put props "ssl.truststore.password" truststore-password)
(.put props "ssl.endpoint.identification.algorithm" "")
(.put props "sasl.enabled.mechanisms" "OAUTHBEARER")
(.put props "sasl.mechanism.inter.broker.protocol" "OAUTHBEARER")
(.put props "zookeeper.connect" "localhost:2181")
(.put props "sasl.jaas.config" "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
props))

(defn create-kafka-config []
"Create Kafka Streams configuration with SASL/SSL."
(let [props (Properties.)]
(.put props "security.protocol" "SASL_SSL")
(.put props "ssl.truststore.location" truststore-path)
(.put props "ssl.truststore.password" truststore-password)
(.put props "sasl.mechanism" "OAUTHBEARER")
(.put props "sasl.jaas.config" "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
props))

(defn start-embedded-kafka [port]
"Start an embedded Kafka broker."
(create-truststore)
(System/setProperty "java.security.auth.login.config" saas-config-path)
(let [log-dir (Files/createTempDirectory "kafka-logs" (into-array FileAttribute []))
config (KafkaConfig. (create-broker-config port log-dir))
kafka-server (KafkaServer. config (MockTime.) (new Some "a") false)]
(.startup kafka-server)
kafka-server))

(defn stop-embedded-kafka [kafka-server]
"Stop the embedded Kafka broker."
(.shutdown kafka-server)
(cleanup-truststore)
(System/clearProperty "java.security.auth.login.config"))


(def ^:dynamic *embedded-kafka* nil)

(def message {:id 7
:path "/photos/h2k3j4h9h23"})

Expand Down Expand Up @@ -252,34 +160,6 @@
(catch Exception e (.getMessage e)))]
(is (= streams "Invalid value com.example.oauthbearer.OAuthBearerLoginCallbackHandler for configuration sasl.login.callback.handler.class: Class com.example.oauthbearer.OAuthBearerLoginCallbackHandler could not be found.")))))

(deftest start-streams-test-with-valid-sasl-configs
(binding [*embedded-kafka* (start-embedded-kafka 9093)]
(try
(with-redefs [ziggurat.config/ssl-config (fn [] {:enabled true
:protocol "SASL_SSL"
:mechanism "OAUTHBEARER"
:login-callback-handler "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler"
:jaas {:login-module "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule"}})]
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count)
times 6
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :bootstrap-servers] "localhost:9093")
(assoc-in [:stream-router :default :application-id] (rand-application-id))
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))]
(Thread/sleep 10000)
(IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic])
kvs
(props-with-sasl-config)
(MockTime.))
(Thread/sleep 5000) ;;waiting for streams to consume messages
(stop-streams streams)
(is (= times @message-received-count))))
(finally (stop-embedded-kafka *embedded-kafka*)))))

(deftest stop-stream-test
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count)
Expand Down

0 comments on commit 6961914

Please sign in to comment.