Skip to content

Commit

Permalink
Added code to fetch key
Browse files Browse the repository at this point in the history
  • Loading branch information
Abhinav Tiwary committed Mar 29, 2024
1 parent 04a417b commit 61f6dd8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
8 changes: 4 additions & 4 deletions src/ziggurat/header_transformer.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
(ns ziggurat.header-transformer
(:import [org.apache.kafka.streams.kstream ValueTransformer]
(:import [org.apache.kafka.streams.kstream ValueTransformerWithKey]
[org.apache.kafka.streams.processor ProcessorContext]))

(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformer
(deftype HeaderTransformer [^{:volatile-mutable true} processor-context] ValueTransformerWithKey
(^void init [_ ^ProcessorContext context]
(set! processor-context context))
(transform [_ record-value]
(transform [_ record-key record-value]
(let [topic (.topic processor-context)
timestamp (.timestamp processor-context)
partition (.partition processor-context)
headers (.headers processor-context)
metadata {:topic topic :timestamp timestamp :partition partition}]
{:value record-value :headers headers :metadata metadata}))
{:value record-value :headers headers :metadata metadata :key record-key}))
(close [_] nil))

(defn create []
Expand Down
5 changes: 3 additions & 2 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
[org.apache.kafka.common.errors TimeoutException]
[org.apache.kafka.streams KafkaStreams KafkaStreams$State StreamsConfig StreamsBuilder Topology]
[org.apache.kafka.streams.errors StreamsUncaughtExceptionHandler StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse]
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerSupplier]
[org.apache.kafka.streams.kstream JoinWindows ValueMapper TransformerSupplier ValueJoiner ValueTransformerWithKeySupplier]
[ziggurat.timestamp_transformer IngestionTimeExtractor]))

(def default-config-for-stream
Expand Down Expand Up @@ -77,7 +77,7 @@

(defn- header-transformer-supplier
[]
(reify ValueTransformerSupplier
(reify ValueTransformerWithKeySupplier
(get [_] (header-transformer/create))))

(defn- timestamp-transform-values [topic-entity-name oldest-processed-message-in-s stream-builder]
Expand Down Expand Up @@ -126,6 +126,7 @@
(try
((mapper-func handler-fn channels)
(-> (->MessagePayload (:value message) topic-entity)
(assoc :key (:key message))
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally)))
Expand Down
8 changes: 4 additions & 4 deletions test/ziggurat/header_transformer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))
transformed-val (.transform transformer "key" "val")]
(is (= {:key "key" :value "val" :headers headers :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val))))

(testing "transforms value with nil headers when not passed"
(let [topic "topic"
Expand All @@ -31,5 +31,5 @@
(partition [_] partition))
transformer (create)
_ (.init transformer context)
transformed-val (.transform transformer "val")]
(is (= {:value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))
transformed-val (.transform transformer "key" "val")]
(is (= {:key "key" :value "val" :headers nil :metadata {:topic topic :timestamp timestamp :partition partition}} transformed-val)))))

0 comments on commit 61f6dd8

Please sign in to comment.