From 3b9f388baf7cfd030e5fa05b26dba8a2cc2891b5 Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Sun, 12 May 2024 12:37:36 +0530 Subject: [PATCH 1/8] implemented sliding window --- src/jackdaw/streams.clj | 5 +++++ src/jackdaw/streams/interop.clj | 24 +++++++++++++++--------- src/jackdaw/streams/protocols.clj | 4 +++- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index 4730327d..c740e5bb 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -322,6 +322,11 @@ ([kgroupedstream window] (p/windowed-by-session kgroupedstream window))) +(defn window-sliding-by-time + "Windows the KStream using sliding windows." + ([kgroupedstream window-size grace-period] + (p/windowed-sliding-by-time kgroupedstream window-size grace-period))) + (defn kgroupedstream* "Returns the underlying KGroupedStream object." ([kgroupedstream] diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index 0b388512..cff5a9b7 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -21,7 +21,7 @@ KeyValueMapper Materialized Merger Predicate Printed Produced Reducer SessionWindowedKStream SessionWindows Suppressed Suppressed$BufferConfig TimeWindowedKStream ValueJoiner - ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier] + ValueMapper ValueTransformerSupplier Windows ForeachAction TransformerSupplier SlidingWindows] [org.apache.kafka.streams.processor.api ProcessorSupplier] [org.apache.kafka.streams.state Stores])) @@ -135,7 +135,7 @@ key-serde value-serde)) builder) - + (streams-builder* [_] streams-builder)) @@ -297,8 +297,8 @@ (merge [_ other-kstream] (clj-kstream - (.merge kstream - ^KStream (kstream* other-kstream)))) + (.merge kstream + ^KStream (kstream* other-kstream)))) (outer-join-windowed [_ other-kstream value-joiner-fn windows] @@ -412,10 +412,10 @@ (join [_ other-ktable foreign-key-extractor-fn value-joiner-fn] (clj-ktable - (.join ^KTable ktable - ^KTable (ktable* other-ktable) - ^Function (foreign-key-extractor foreign-key-extractor-fn) - ^ValueJoiner (value-joiner value-joiner-fn)))) + (.join ^KTable ktable + ^KTable (ktable* other-ktable) + ^Function (foreign-key-extractor foreign-key-extractor-fn) + ^ValueJoiner (value-joiner value-joiner-fn)))) (left-join [_ other-ktable value-joiner-fn] @@ -464,7 +464,7 @@ (suppress [_ suppress-config] (clj-ktable - (.suppress ^KTable ktable (suppress-config->suppressed suppress-config)))) + (.suppress ^KTable ktable (suppress-config->suppressed suppress-config)))) (to-kstream [_] @@ -603,6 +603,12 @@ (clj-session-windowed-kstream (.windowedBy ^KGroupedStream kgroupedstream ^SessionWindows windows))) + (windowed-sliding-by-time + [_ window-size grace-period] + (let [time-windowed-stream (.windowedBy ^KGroupedStream kgroupedstream + (SlidingWindows/withTimeDifferenceAndGrace window-size grace-period))] + (clj-time-windowed-kstream time-windowed-stream))) + (kgroupedstream* [_] kgroupedstream)) diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index 894ef86a..eede6ace 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -33,7 +33,7 @@ [topology-builder store-config] "Adds a persistent state store to the topology with the configured name and serdes.") - + (streams-builder* [streams-builder] "Returns the underlying KStreamBuilder.")) @@ -254,6 +254,8 @@ (windowed-by-session [kgroupedstream window]) + (windowed-sliding-by-time [kgroupedstream window-size grace-period]) + (kgroupedstream* [kgroupedstream] "Returns the underlying KGroupedStream object.")) From c908234554953b15b9eb841ba7e79bc453c56d3a Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Tue, 9 Jul 2024 00:10:50 +0530 Subject: [PATCH 2/8] add test, no implicit SlidingWindows creation --- .portal/vs-code.edn | 1 + src/jackdaw/streams.clj | 8 ++++---- src/jackdaw/streams/interop.clj | 9 ++++----- src/jackdaw/streams/protocols.clj | 3 +-- test/jackdaw/streams_test.clj | 30 +++++++++++++++++++++++++++++- 5 files changed, 39 insertions(+), 12 deletions(-) create mode 100644 .portal/vs-code.edn diff --git a/.portal/vs-code.edn b/.portal/vs-code.edn new file mode 100644 index 00000000..d732691b --- /dev/null +++ b/.portal/vs-code.edn @@ -0,0 +1 @@ +{:host "localhost", :port 49383} \ No newline at end of file diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index c740e5bb..d4b03611 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -322,10 +322,10 @@ ([kgroupedstream window] (p/windowed-by-session kgroupedstream window))) -(defn window-sliding-by-time - "Windows the KStream using sliding windows." - ([kgroupedstream window-size grace-period] - (p/windowed-sliding-by-time kgroupedstream window-size grace-period))) +(defn sliding-window-by-time + "Windows the KStream using a sliding window" + ([kgroupedstream window] + (p/sliding-window-by-time kgroupedstream window))) (defn kgroupedstream* "Returns the underlying KGroupedStream object." diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index cff5a9b7..b62fbc4e 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -603,11 +603,10 @@ (clj-session-windowed-kstream (.windowedBy ^KGroupedStream kgroupedstream ^SessionWindows windows))) - (windowed-sliding-by-time - [_ window-size grace-period] - (let [time-windowed-stream (.windowedBy ^KGroupedStream kgroupedstream - (SlidingWindows/withTimeDifferenceAndGrace window-size grace-period))] - (clj-time-windowed-kstream time-windowed-stream))) + (sliding-window-by-time + [_ windows] + (clj-time-windowed-kstream + (.windowedBy ^KGroupedStream kgroupedstream ^SlidingWindows windows))) (kgroupedstream* [_] diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index eede6ace..4f35552a 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -33,7 +33,6 @@ [topology-builder store-config] "Adds a persistent state store to the topology with the configured name and serdes.") - (streams-builder* [streams-builder] "Returns the underlying KStreamBuilder.")) @@ -254,7 +253,7 @@ (windowed-by-session [kgroupedstream window]) - (windowed-sliding-by-time [kgroupedstream window-size grace-period]) + (sliding-window-by-time [kgroupedstream window]) (kgroupedstream* [kgroupedstream] diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 613c0b87..cb529eca 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -14,7 +14,7 @@ (:import [java.time Duration] [org.apache.kafka.streams.kstream JoinWindows SessionWindows TimeWindows Transformer - ValueTransformer] + ValueTransformer SlidingWindows] org.apache.kafka.streams.StreamsBuilder [org.apache.kafka.common.serialization Serdes])) @@ -1118,6 +1118,34 @@ (is (= ["a" 1] (first keyvals))) (is (= ["a" 3] (second keyvals))) (is (= ["a" 4] (nth keyvals 2)))))) + + (testing "sliding-window-by-time" + (let [topic-a (mock/topic "topic-a") + topic-b (mock/topic "topic-b") + driver (mock/build-driver (fn [builder] + (-> builder + (k/kstream topic-a) + (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) + (k/sliding-window-by-time (SlidingWindows/withTimeDifferenceAndGrace + (Duration/ofMillis 1000) + (Duration/ofMillis 100))) + (k/reduce + topic-a) + (k/to-kstream) + (k/map (fn [[k v]] [(.key k) v])) + (k/to topic-b)))) + publish (partial mock/publish driver topic-a)] + + (publish 1000 1 1) + (publish 1500 1 2) + (publish 1900 1 3) + (publish 2100 1 4) + + (let [keyvals (mock/get-keyvals driver topic-b)] + (is (= 4 (count keyvals))) + (is (= [0 1] (first keyvals))) + (is (= [0 3] (second keyvals))) + (is (= [0 6] (nth keyvals 2))) + (is (= [0 9] (nth keyvals 3)))))) (testing "windowed-by-session: reduce" (let [topic-a (mock/topic "topic-a") From eadab554539c5334a8b7626eb38fc00e411b29b7 Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Tue, 9 Jul 2024 00:13:44 +0530 Subject: [PATCH 3/8] remove vs-code.edn --- .portal/vs-code.edn | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .portal/vs-code.edn diff --git a/.portal/vs-code.edn b/.portal/vs-code.edn deleted file mode 100644 index d732691b..00000000 --- a/.portal/vs-code.edn +++ /dev/null @@ -1 +0,0 @@ -{:host "localhost", :port 49383} \ No newline at end of file From 11600d0231e446466f9afc4013df7f28c184b8ed Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Tue, 9 Jul 2024 06:42:42 +0530 Subject: [PATCH 4/8] remove formatting changes, add .portal to gitignore --- .gitignore | 3 ++- project.clj | 4 ++-- src/jackdaw/streams/interop.clj | 14 +++++++------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 70a56487..08348da8 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,5 @@ pom.xml.asc /examples/*/target .clj-kondo/ .lsp/ -.calva/ \ No newline at end of file +.calva/ +.portal/ \ No newline at end of file diff --git a/project.clj b/project.clj index 550b5b45..5706ce52 100644 --- a/project.clj +++ b/project.clj @@ -1,8 +1,8 @@ -(defproject fundingcircle/jackdaw "_" +(defproject fundingcircle/jackdaw "0.0.1-SNAPSHOT" :description "A Clojure library for the Apache Kafka distributed streaming platform." :license {:name "BSD 3-clause" :url "http://opensource.org/licenses/BSD-3-Clause"} - + :scm {:name "git" :url "https://github.com/fundingcircle/jackdaw"} :url "https://github.com/FundingCircle/jackdaw/" diff --git a/src/jackdaw/streams/interop.clj b/src/jackdaw/streams/interop.clj index b62fbc4e..813b4a0b 100644 --- a/src/jackdaw/streams/interop.clj +++ b/src/jackdaw/streams/interop.clj @@ -297,8 +297,8 @@ (merge [_ other-kstream] (clj-kstream - (.merge kstream - ^KStream (kstream* other-kstream)))) + (.merge kstream + ^KStream (kstream* other-kstream)))) (outer-join-windowed [_ other-kstream value-joiner-fn windows] @@ -412,10 +412,10 @@ (join [_ other-ktable foreign-key-extractor-fn value-joiner-fn] (clj-ktable - (.join ^KTable ktable - ^KTable (ktable* other-ktable) - ^Function (foreign-key-extractor foreign-key-extractor-fn) - ^ValueJoiner (value-joiner value-joiner-fn)))) + (.join ^KTable ktable + ^KTable (ktable* other-ktable) + ^Function (foreign-key-extractor foreign-key-extractor-fn) + ^ValueJoiner (value-joiner value-joiner-fn)))) (left-join [_ other-ktable value-joiner-fn] @@ -464,7 +464,7 @@ (suppress [_ suppress-config] (clj-ktable - (.suppress ^KTable ktable (suppress-config->suppressed suppress-config)))) + (.suppress ^KTable ktable (suppress-config->suppressed suppress-config)))) (to-kstream [_] From 02cba76b3fdc8faf8cce7794def37a150736b07a Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Tue, 9 Jul 2024 06:44:03 +0530 Subject: [PATCH 5/8] revert hardcoded value of project name from project.clj --- project.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 5706ce52..04eba8b1 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject fundingcircle/jackdaw "0.0.1-SNAPSHOT" +(defproject fundingcircle/jackdaw "_" :description "A Clojure library for the Apache Kafka distributed streaming platform." :license {:name "BSD 3-clause" :url "http://opensource.org/licenses/BSD-3-Clause"} From 420326aeaae699d1eee1a75f5c3c52f94404aaf6 Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Tue, 9 Jul 2024 10:14:25 +0530 Subject: [PATCH 6/8] suppress output of sliding windows till they are closed, fix tests accordingly --- src/jackdaw/streams.clj | 10 +++- src/jackdaw/streams/protocols.clj | 4 +- test/jackdaw/streams_test.clj | 78 ++++++++++++++++++++----------- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index d4b03611..dc886635 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -324,8 +324,14 @@ (defn sliding-window-by-time "Windows the KStream using a sliding window" - ([kgroupedstream window] - (p/sliding-window-by-time kgroupedstream window))) + ([kgroupedstream window topic-config] + (-> kgroupedstream + (p/sliding-window-by-time window) + (aggregate (fn [] 0) + (fn [aggr [_k v]] (+ aggr v)) ; Extract the value from [k v] + (assoc topic-config :topic-name "sliding-window-store")) + (suppress {:until-time-limit-ms (.timeDifferenceMs window)}) + (to-kstream)))) (defn kgroupedstream* "Returns the underlying KGroupedStream object." diff --git a/src/jackdaw/streams/protocols.clj b/src/jackdaw/streams/protocols.clj index 4f35552a..7c9f105a 100644 --- a/src/jackdaw/streams/protocols.clj +++ b/src/jackdaw/streams/protocols.clj @@ -253,7 +253,9 @@ (windowed-by-session [kgroupedstream window]) - (sliding-window-by-time [kgroupedstream window]) + (sliding-window-by-time + [kgroupedstream window] + "Windows the KGroupedStream using a sliding time window.") (kgroupedstream* [kgroupedstream] diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index cb529eca..1e5ce7fa 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -1095,6 +1095,56 @@ (is (= [0 3] (second keyvals))) (is (= [0 4] (nth keyvals 2)))))) + (testing "sliding-window-by-time" + (let [topic-a (mock/topic "topic-a") + topic-b (mock/topic "topic-b") + window-size (Duration/ofMillis 1000) + driver (mock/build-driver (fn [builder] + (-> builder + (k/kstream topic-a) + ;; (k/peek (fn [[k v]] + ;; (println "Input:" k v))) + (k/group-by (fn [[k v]] + (let [result (long (/ k 10))] + ;; (println "Group-by key:" result) + result)) + topic-a) + (k/sliding-window-by-time + (SlidingWindows/ofTimeDifferenceWithNoGrace window-size) + topic-a) + (k/map (fn [[k v]] + (let [original-key (.key k)] + ;; (println "key val:" original-key v + ;; "\nWindow:" k) + [original-key v]))) + (k/to topic-b)))) + publish (partial mock/publish driver topic-a)] + + (publish 1000 1 1) + (publish 1500 1 2) + (publish 1900 1 3) + (publish 2100 1 4) + (publish 2500 1 5) + (publish 3000 1 6) + (publish 3500 1 7) + + + (let [keyvals (mock/get-keyvals driver topic-b)] + ;; (println "Total keyvals:" (count keyvals)) + ;; (doseq [kv keyvals] + ;; (println "Keyval:" kv)) + (is (= 9 (count keyvals))) + (is (= [[0 1] ; Window 0-1000 + [0 3] ; Window 500-1500 + [0 5] ; Window 1001-2001 + [0 6] ; Window 900-1900 + [0 12] ; Window 1501-2501 + [0 9] ; Window 1100-2100 + [0 9] ; Window 1901-2901 + [0 14] ; Window 1500-2500 + [0 11]] ; Window 2101-3101 + keyvals))))) + (testing "windowed-by-time with string keys" (let [topic-a (assoc (mock/topic "topic-a") :key-serde (Serdes/String)) topic-b (assoc (mock/topic "topic-b") :key-serde (Serdes/String)) @@ -1118,34 +1168,6 @@ (is (= ["a" 1] (first keyvals))) (is (= ["a" 3] (second keyvals))) (is (= ["a" 4] (nth keyvals 2)))))) - - (testing "sliding-window-by-time" - (let [topic-a (mock/topic "topic-a") - topic-b (mock/topic "topic-b") - driver (mock/build-driver (fn [builder] - (-> builder - (k/kstream topic-a) - (k/group-by (fn [[k _v]] (long (/ k 10))) topic-a) - (k/sliding-window-by-time (SlidingWindows/withTimeDifferenceAndGrace - (Duration/ofMillis 1000) - (Duration/ofMillis 100))) - (k/reduce + topic-a) - (k/to-kstream) - (k/map (fn [[k v]] [(.key k) v])) - (k/to topic-b)))) - publish (partial mock/publish driver topic-a)] - - (publish 1000 1 1) - (publish 1500 1 2) - (publish 1900 1 3) - (publish 2100 1 4) - - (let [keyvals (mock/get-keyvals driver topic-b)] - (is (= 4 (count keyvals))) - (is (= [0 1] (first keyvals))) - (is (= [0 3] (second keyvals))) - (is (= [0 6] (nth keyvals 2))) - (is (= [0 9] (nth keyvals 3)))))) (testing "windowed-by-session: reduce" (let [topic-a (mock/topic "topic-a") From a01ecc91ba1b73cc1aba4fd4f5003db742b3dd29 Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Tue, 9 Jul 2024 10:28:50 +0530 Subject: [PATCH 7/8] generic function to collect values in sliding windows --- project.clj | 1 - src/jackdaw/streams.clj | 10 ++++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/project.clj b/project.clj index 04eba8b1..e927b074 100644 --- a/project.clj +++ b/project.clj @@ -2,7 +2,6 @@ :description "A Clojure library for the Apache Kafka distributed streaming platform." :license {:name "BSD 3-clause" :url "http://opensource.org/licenses/BSD-3-Clause"} - :scm {:name "git" :url "https://github.com/fundingcircle/jackdaw"} :url "https://github.com/FundingCircle/jackdaw/" diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index dc886635..d35da385 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -324,13 +324,15 @@ (defn sliding-window-by-time "Windows the KStream using a sliding window" - ([kgroupedstream window topic-config] + ([kgroupedstream window topic-config] ; default aggregation (sum) + (sliding-window-by-time kgroupedstream window topic-config (fn [] 0) (fn [aggr [_k v]] (+ aggr v)))) + ([kgroupedstream window topic-config initializer-fn aggregator-fn] (-> kgroupedstream (p/sliding-window-by-time window) - (aggregate (fn [] 0) - (fn [aggr [_k v]] (+ aggr v)) ; Extract the value from [k v] + (aggregate initializer-fn + aggregator-fn (assoc topic-config :topic-name "sliding-window-store")) - (suppress {:until-time-limit-ms (.timeDifferenceMs window)}) + (suppress {:until-time-limit-ms (.timeDifferenceMs window)}) (to-kstream)))) (defn kgroupedstream* From 61804010e774b0271084e9dff9c96ab4b351d28f Mon Sep 17 00:00:00 2001 From: Saurabh Mehta Date: Thu, 11 Jul 2024 00:17:58 +0530 Subject: [PATCH 8/8] use empty suppress config to get Suppressed/untilWindowCloses --- src/jackdaw/streams.clj | 2 +- test/jackdaw/streams_test.clj | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/jackdaw/streams.clj b/src/jackdaw/streams.clj index d35da385..d092f617 100644 --- a/src/jackdaw/streams.clj +++ b/src/jackdaw/streams.clj @@ -332,7 +332,7 @@ (aggregate initializer-fn aggregator-fn (assoc topic-config :topic-name "sliding-window-store")) - (suppress {:until-time-limit-ms (.timeDifferenceMs window)}) + (suppress {}) (to-kstream)))) (defn kgroupedstream* diff --git a/test/jackdaw/streams_test.clj b/test/jackdaw/streams_test.clj index 1e5ce7fa..95940287 100644 --- a/test/jackdaw/streams_test.clj +++ b/test/jackdaw/streams_test.clj @@ -1133,16 +1133,18 @@ ;; (println "Total keyvals:" (count keyvals)) ;; (doseq [kv keyvals] ;; (println "Keyval:" kv)) - (is (= 9 (count keyvals))) + (is (= 11 (count keyvals))) (is (= [[0 1] ; Window 0-1000 [0 3] ; Window 500-1500 - [0 5] ; Window 1001-2001 [0 6] ; Window 900-1900 - [0 12] ; Window 1501-2501 + [0 5] ; Window 1001-2001 [0 9] ; Window 1100-2100 - [0 9] ; Window 1901-2901 [0 14] ; Window 1500-2500 - [0 11]] ; Window 2101-3101 + [0 12] ; Window 1501-2501 + [0 9] ; Window 1901-2901 + [0 15] ; Window 2000-3000 + [0 11] ; Window 2101-3101 + [0 18]] ;Window 2500-3500 keyvals))))) (testing "windowed-by-time with string keys"