diff --git a/src/lib/dirac/lib/ws_server.clj b/src/lib/dirac/lib/ws_server.clj index c8f6c995db..5f556110b9 100644 --- a/src/lib/dirac/lib/ws_server.clj +++ b/src/lib/dirac/lib/ws_server.clj @@ -1,6 +1,7 @@ ; taken from https://github.com/tomjakubowski/weasel/tree/8bfeb29dbaf903e299b2a3296caed52b5761318f (ns dirac.lib.ws-server (:require [org.httpkit.server :as http] + [clojure.core.async :as core-async :refer [chan !! put! alts!! timeout go go-loop]] [clojure.tools.logging :as log] [dirac.lib.utils :as utils] [dirac.logging :as logging]) @@ -53,10 +54,7 @@ (defn add-client! [server client] {:pre [(instance? WebSocketServer server)]} - (swap! (:clients server) conj client) - (let [first-client-promise (get-first-client-promise server)] - (if-not (realized? first-client-promise) - (deliver first-client-promise client)))) + (swap! (:clients server) conj client)) (defn has-clients? [server] {:pre [(instance? WebSocketServer server)]} @@ -82,9 +80,14 @@ port (get-local-port server)] (utils/get-ws-url host port))) +(defn deliver-first-client-promise! [server client] + (let [first-client-promise (get-first-client-promise server)] + (if-not (realized? first-client-promise) + (deliver first-client-promise client)))) + ; -- client data ------------------------------------------------------------------------------------------------------------ -(defrecord WebSocketServerClient [id channel ready-promise] +(defrecord WebSocketServerClient [id channel jobs-channel done-promise] Object (toString [this] (str "[WebSocketServerClient#" (:id this) "]"))) @@ -97,7 +100,7 @@ (vswap! last-client-id inc)) (defn make-client [channel] - (let [client (WebSocketServerClient. (next-client-id!) channel (promise))] + (let [client (WebSocketServerClient. (next-client-id!) channel (chan 32) (promise))] (log/trace "Made" (str client)) client)) @@ -107,15 +110,17 @@ {:pre [(instance? WebSocketServerClient client)]} (:channel client)) -(defn get-ready-promise [client] +(defn get-done-promise [client] + {:pre [(instance? WebSocketServerClient client)]} + (:done-promise client)) + +(defn get-jobs-channel [client] {:pre [(instance? WebSocketServerClient client)]} - (:ready-promise client)) + (:jobs-channel client)) -; http server is multithreaded and we can potentially enter handlers before accept-client finishes -; this is a guard to wait for full client initialization after executing potentially long running on-incoming-client -(defn wait-for-client-to-be-ready [client] +(defn add-job! [client & args] {:pre [(instance? WebSocketServerClient client)]} - @(get-ready-promise client)) + (put! (get-jobs-channel client) args)) ; -- serialization --------------------------------------------------------------------------------------------------------- ; TODO: make serialization pluggable @@ -129,16 +134,17 @@ ; -- request handling ------------------------------------------------------------------------------------------------------- (defn on-close [server client status] - (wait-for-client-to-be-ready client) - (let [{:keys [on-client-disconnection on-leaving-client]} (get-options server)] - (if on-client-disconnection - (on-client-disconnection server client status)) - (if on-leaving-client - (on-leaving-client server client)) - (remove-client! server client))) + (core-async/close! (get-jobs-channel client)) + (future ; don't block job-processing-loop + (let [{:keys [on-client-disconnection on-leaving-client]} (get-options server)] + @(get-done-promise client) ; wait for all pending jobs to get processed + (if on-client-disconnection + (on-client-disconnection server client status)) + (if on-leaving-client + (on-leaving-client server client)) + (remove-client! server client)))) (defn on-receive [server client serialized-msg] - (wait-for-client-to-be-ready client) (let [{:keys [on-receive on-message]} (get-options server)] (if on-receive (on-receive server client serialized-msg)) @@ -146,24 +152,42 @@ (let [msg (unserialize-msg serialized-msg)] (on-message server client msg))))) -(defn accept-client [server channel] - (let [client (make-client channel) - {:keys [on-incoming-client]} (get-options server)] +(defn accept-client! [server client] + (let [{:keys [on-incoming-client]} (get-options server)] (add-client! server client) - (http/on-close channel (partial on-close server client)) - (http/on-receive channel (partial on-receive server client)) (if on-incoming-client (on-incoming-client server client)) - (deliver (get-ready-promise client) true))) + (deliver-first-client-promise! server client))) + +(defn run-client-job-processing-loop! [client] + (let [jobs (get-jobs-channel client)] + (log/debug (str client) "Entering job-processing-loop") + (go-loop [] + (if-let [[method & args] (