Skip to content

Commit

Permalink
[EV-38][Indra] Remove opentracing from streams
Browse files Browse the repository at this point in the history
  • Loading branch information
indrajithi committed Oct 17, 2023
1 parent 278967e commit 3ed081a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 56 deletions.
37 changes: 12 additions & 25 deletions src/ziggurat/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
[ziggurat.message-payload :refer [->MessagePayload]]
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.map :as umap]
[cambium.core :as clog])
(:import [io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.tag Tags]
[java.time Duration]
(:import [java.time Duration]
[java.util Properties]
[java.util.regex Pattern]
[org.apache.kafka.common.errors TimeoutException]
Expand Down Expand Up @@ -126,22 +122,14 @@
(doseq [[topic-entity stream] streams]
(close-stream topic-entity stream)))

(defn- traced-handler-fn [handler-fn channels message topic-entity]
(let [parent-ctx (TracingKafkaUtils/extractSpanContext (:headers message) tracer)
span (as-> tracer t
(.buildSpan t "Message-Handler")
(.withTag t (.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER)
(.withTag t (.getKey Tags/COMPONENT) "ziggurat")
(if (nil? parent-ctx)
t
(.asChildOf t parent-ctx))
(.start t))]
(try
((mapper-func handler-fn channels) (-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally
(.finish span)))))
(defn- mapped-handler-fn [handler-fn channels message topic-entity]
(try
((mapper-func handler-fn channels)
(-> (->MessagePayload (:value message) topic-entity)
(assoc :headers (:headers message))
(assoc :metadata (:metadata message))))
(finally)))


(defn- join-streams
[oldest-processed-message-in-s topic-entity stream-1 stream-2]
Expand Down Expand Up @@ -187,7 +175,7 @@
{stream :stream} (reduce (partial join-streams oldest-processed-message-in-s topic-entity) stream-map)]
(->> stream
(header-transform-values)
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(mapped-handler-fn handler-fn channels % topic-entity)))
(.build builder))))

(defn- topology [handler-fn {:keys [origin-topic oldest-processed-message-in-s]} topic-entity channels]
Expand All @@ -198,7 +186,7 @@
(timestamp-transform-values topic-entity-name oldest-processed-message-in-s)
(header-transform-values)
(map-values #(log-and-report-metrics topic-entity-name %))
(map-values #(traced-handler-fn handler-fn channels % topic-entity)))
(map-values #(mapped-handler-fn handler-fn channels % topic-entity)))
(.build builder)))

(defn- start-stream* [handler-fn stream-config topic-entity channels]
Expand All @@ -209,8 +197,7 @@

(when-not (nil? top)
(KafkaStreams. ^Topology top
^Properties (properties stream-config)
(new TracingKafkaClientSupplier tracer)))))
^Properties (properties stream-config)))))

(defn- merge-consumer-type-config
[config]
Expand Down
32 changes: 1 addition & 31 deletions test/ziggurat/streams_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
[ziggurat.middleware.json :as json-middleware]
[ziggurat.middleware.stream-joins :as stream-joins-middleware]
[ziggurat.streams :refer [add-stream-thread get-stream-thread-count remove-stream-thread start-streams stop-streams stop-stream start-stream]]
[ziggurat.streams :refer [handle-uncaught-exception start-stream start-streams stop-stream stop-streams]]
[ziggurat.tracer :refer [tracer]])
[ziggurat.streams :refer [handle-uncaught-exception start-stream start-streams stop-stream stop-streams]])
(:import [com.gojek.test.proto Example$Photo]
[io.opentracing.tag Tags]
[java.util Properties]
[org.apache.kafka.clients.producer ProducerConfig]
(org.apache.kafka.common.utils MockTime)
Expand Down Expand Up @@ -349,34 +347,6 @@
(stop-streams streams)
(is (= times @message-received-count))))

(deftest start-streams-test-with-tracer
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count)
times 1
kvs (repeat times message-key-value)
handler-fn (default-middleware/protobuf->hash mapped-fn proto-class :default)
streams (start-streams {:default {:handler-fn handler-fn}}
(-> (ziggurat-config)
(assoc-in [:stream-router :default :application-id] (rand-application-id))
(assoc-in [:stream-router :default :changelog-topic-replication-factor] changelog-topic-replication-factor)))]
(Thread/sleep 10000) ;;waiting for streams to start
(IntegrationTestUtils/produceKeyValuesSynchronously (get-in (ziggurat-config) [:stream-router :default :origin-topic])
kvs
(props)
(MockTime.))
(Thread/sleep 10000) ;;wating for streams to consume messages
(stop-streams streams)
(is (= times @message-received-count))
(let [finished-spans (.finishedSpans tracer)
tags (-> finished-spans
(.get 1)
(.tags))]
(is (= 2 (.size finished-spans))) ;;2 spans - one from the TracingKafkaClientSupplier and one for the actual handler function
(is (= "Message-Handler" (-> finished-spans
(.get 1)
(.operationName))))
(is (= {(.getKey Tags/SPAN_KIND) Tags/SPAN_KIND_CONSUMER, (.getKey Tags/COMPONENT) "ziggurat"} tags)))))

(deftest start-streams-test-when-tracer-is-not-configured
(let [message-received-count (atom 0)
mapped-fn (get-mapped-fn message-received-count)
Expand Down

0 comments on commit 3ed081a

Please sign in to comment.