Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new sliding window by time function #374

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
suppress output of sliding windows till they are closed, fix tests ac…
…cordingly
sorumehta committed Jul 9, 2024
commit 420326aeaae699d1eee1a75f5c3c52f94404aaf6
10 changes: 8 additions & 2 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
@@ -324,8 +324,14 @@

(defn sliding-window-by-time
"Windows the KStream using a sliding window"
([kgroupedstream window]
(p/sliding-window-by-time kgroupedstream window)))
([kgroupedstream window topic-config]
(-> kgroupedstream
(p/sliding-window-by-time window)
(aggregate (fn [] 0)
(fn [aggr [_k v]] (+ aggr v)) ; Extract the value from [k v]
(assoc topic-config :topic-name "sliding-window-store"))
(suppress {:until-time-limit-ms (.timeDifferenceMs window)})
(to-kstream))))

(defn kgroupedstream*
"Returns the underlying KGroupedStream object."
4 changes: 3 additions & 1 deletion src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
@@ -253,7 +253,9 @@

(windowed-by-session [kgroupedstream window])

(sliding-window-by-time [kgroupedstream window])
(sliding-window-by-time
[kgroupedstream window]
"Windows the KGroupedStream using a sliding time window.")

(kgroupedstream*
[kgroupedstream]
78 changes: 50 additions & 28 deletions test/jackdaw/streams_test.clj
Original file line number Diff line number Diff line change
@@ -1095,6 +1095,56 @@
(is (= [0 3] (second keyvals)))
(is (= [0 4] (nth keyvals 2))))))

(testing "sliding-window-by-time"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
window-size (Duration/ofMillis 1000)
driver (mock/build-driver (fn [builder]
(-> builder
(k/kstream topic-a)
;; (k/peek (fn [[k v]]
;; (println "Input:" k v)))
(k/group-by (fn [[k v]]
(let [result (long (/ k 10))]
;; (println "Group-by key:" result)
result))
topic-a)
(k/sliding-window-by-time
(SlidingWindows/ofTimeDifferenceWithNoGrace window-size)
topic-a)
(k/map (fn [[k v]]
(let [original-key (.key k)]
;; (println "key val:" original-key v
;; "\nWindow:" k)
[original-key v])))
(k/to topic-b))))
publish (partial mock/publish driver topic-a)]

(publish 1000 1 1)
(publish 1500 1 2)
(publish 1900 1 3)
(publish 2100 1 4)
(publish 2500 1 5)
(publish 3000 1 6)
(publish 3500 1 7)


(let [keyvals (mock/get-keyvals driver topic-b)]
;; (println "Total keyvals:" (count keyvals))
;; (doseq [kv keyvals]
;; (println "Keyval:" kv))
(is (= 9 (count keyvals)))
(is (= [[0 1] ; Window 0-1000
[0 3] ; Window 500-1500
[0 5] ; Window 1001-2001
[0 6] ; Window 900-1900
[0 12] ; Window 1501-2501
[0 9] ; Window 1100-2100
[0 9] ; Window 1901-2901
[0 14] ; Window 1500-2500
[0 11]] ; Window 2101-3101
keyvals)))))

(testing "windowed-by-time with string keys"
(let [topic-a (assoc (mock/topic "topic-a") :key-serde (Serdes/String))
topic-b (assoc (mock/topic "topic-b") :key-serde (Serdes/String))
@@ -1118,34 +1168,6 @@
(is (= ["a" 1] (first keyvals)))
(is (= ["a" 3] (second keyvals)))
(is (= ["a" 4] (nth keyvals 2))))))

(testing "sliding-window-by-time"
(let [topic-a (mock/topic "topic-a")
topic-b (mock/topic "topic-b")
driver (mock/build-driver (fn [builder]
(-> builder
(k/kstream topic-a)
(k/group-by (fn [[k _v]] (long (/ k 10))) topic-a)
(k/sliding-window-by-time (SlidingWindows/withTimeDifferenceAndGrace
(Duration/ofMillis 1000)
(Duration/ofMillis 100)))
(k/reduce + topic-a)
(k/to-kstream)
(k/map (fn [[k v]] [(.key k) v]))
(k/to topic-b))))
publish (partial mock/publish driver topic-a)]

(publish 1000 1 1)
(publish 1500 1 2)
(publish 1900 1 3)
(publish 2100 1 4)

(let [keyvals (mock/get-keyvals driver topic-b)]
(is (= 4 (count keyvals)))
(is (= [0 1] (first keyvals)))
(is (= [0 3] (second keyvals)))
(is (= [0 6] (nth keyvals 2)))
(is (= [0 9] (nth keyvals 3))))))

(testing "windowed-by-session: reduce"
(let [topic-a (mock/topic "topic-a")