Skip to content

Commit

Permalink
[new] [Signal API] Add error-fn, backp-fn tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Dec 5, 2023
1 parent 7f8a254 commit 0a3df15
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/taoensso/encore.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5370,8 +5370,8 @@

run-fn
(case mode
:blocking (fn [f] (or (.offer abq f) (.put abq f) false))
:dropping (fn [f] (or (.offer abq f) false))
:blocking (fn [f] (or (.offer abq f) (do (.put abq f) false)))
:dropping (fn [f] (or (.offer abq f) false))
:sliding
(fn [f]
(or
Expand Down
50 changes: 26 additions & 24 deletions src/taoensso/encore/signals/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,19 @@

(catch :any t
(when error-fn
(when-not (and rl-error (rl-error handler-id))
(if-not (enc/identical-kw? error-fn ::default)
(error-fn {:handler-id handler-id, :error t})
(enc/signal!
{:level :error
:id ::handler-error
:error t
:msg "[taoensso/signals] Error executing wrapped handler fn"
:data
{:handler-id handler-id
:handler-fn handler-fn
:dispatch-opts dispatch-opts}}))))
(enc/catching
(when-not (and rl-error (rl-error handler-id))
(if-not (enc/identical-kw? error-fn ::default)
(error-fn {:handler-id handler-id, :error t})
(enc/signal!
{:level :error
:id ::handler-error
:error t
:msg "[taoensso/signals] Error executing wrapped handler fn"
:data
{:handler-id handler-id
:handler-fn handler-fn
:dispatch-opts dispatch-opts}})))))
nil)))))]

#?(:cljs wrapped-handler-fn
Expand All @@ -330,17 +331,18 @@
(fn wrapped-handler-fn* [signal]
(when-let [back-pressure? (false? (runner (fn [] (wrapped-handler-fn signal))))]
(when backp-fn
(when-not (and rl-backp (rl-backp handler-id))
(if-not (enc/identical-kw? backp-fn ::default)
(backp-fn {:handler-id handler-id})
(enc/signal!
{:level :warn
:id ::handler-back-pressure
:msg "[taoensso/signals] Back pressure on wrapped handler fn"
:data
{:handler-id handler-id
:handler-fn handler-fn
:dispatch-opts dispatch-opts}})))))))))))
(enc/catching
(when-not (and rl-backp (rl-backp handler-id))
(if-not (enc/identical-kw? backp-fn ::default)
(backp-fn {:handler-id handler-id})
(enc/signal!
{:level :warn
:id ::handler-back-pressure
:msg "[taoensso/signals] Back pressure on wrapped handler fn"
:data
{:handler-id handler-id
:handler-fn handler-fn
:dispatch-opts dispatch-opts}}))))))))))))

#?(:clj
(defmacro def-handler-api
Expand Down Expand Up @@ -412,7 +414,7 @@
Options for running handler asynchronously via `taoensso.encore/runner`,
{:keys [mode buffer-size n-threads daemon-threads? ...]}
Supports `:blocking`, `:dropping`, and `:sliding` back-pressure modes.
Supports `:blocking`, `:dropping`, and `:sliding` back pressure modes.
NB handling order may be non-sequential when `n-threads` > 1.
`sample`
Expand Down
24 changes: 22 additions & 2 deletions test/taoensso/encore_tests.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
;; (deftest pass (is (= 1 1)))
;; (deftest fail (is (= 1 0)))

(defn- throw! [x] (throw (ex-info "Error" {:arg {:value x :type (type x)}})))
(defn- throw!
([ ] (throw (ex-info "TestEx" {})))
([x] (throw (ex-info "TestEx" {:arg {:value x :type (type x)}}))))

;;;; Core

Expand Down Expand Up @@ -1134,6 +1136,7 @@
(is (nil? (remove-handler! :hid1)))
(is (nil? *sig-handlers*) "Removal yields non-empty map")

#_
(testing "Handler middleware"
(let [v1 (atom ::nx)
v2 (atom ::nx)
Expand All @@ -1154,7 +1157,24 @@

(is (map? (remove-handler! :hid1)))
(is (map? (remove-handler! :hid2)))
(is (nil? (remove-handler! :hid3)))]))])])
(is (nil? (remove-handler! :hid3)))]))

(testing "Handler error-fn (wrapped handlers trap exceptions, send to `error-fn`)"
(let [fn-arg_ (atom nil)]
(enc/update-var-root! *sig-handlers* (fn [_] nil))
(add-handler! :hid1 (fn [_] (throw!)) {:error-fn (fn [x] (reset! fn-arg_ x)), :async nil})
(sigs/call-handlers! *sig-handlers* (MySignal. :info "foo"))
(is (enc/submap? @fn-arg_ {:handler-id :hid1, :error (enc/pred enc/error?)}))))

#?(:clj
(testing "Handler backp-fn (handler dispatch detects back pressure, triggers `backp-fn`)"
(let [fn-arg_ (atom nil)]
(enc/update-var-root! *sig-handlers* (fn [_] nil))
(add-handler! :hid1 (fn [_] (Thread/sleep 1000)) {:backp-fn (fn [x] (reset! fn-arg_ x)), :async {:mode :blocking, :buffer-size 1}})
(sigs/call-handlers! *sig-handlers* (MySignal. :info "1"))
(sigs/call-handlers! *sig-handlers* (MySignal. :info "2")) ; Should trigger back pressure
(Thread/sleep 2000) ; Wait for second signal to enqueue
(is (enc/submap? @fn-arg_ {:handler-id :hid1})))))])])

;;;;

Expand Down

0 comments on commit 0a3df15

Please sign in to comment.