Skip to content

Commit

Permalink
lib: re-implement web-socket server using a core.async channel as queue
Browse files Browse the repository at this point in the history
Previous implementation has been calling http/on-receive too late.
It could happen that client sends a message too quickly after opening
a connection that we could sometimes miss it.

New approach registers http/on-close and http/on-receive ASAP. But
instead of processing messages right away it is putting them on
core.async channel. When we are ready after accepting client connection
and have all initialisation set up, we can start processing
jobs on the channel asynchronously.
  • Loading branch information
darwin committed May 20, 2016
1 parent 06f2978 commit 5a20682
Showing 1 changed file with 54 additions and 30 deletions.
84 changes: 54 additions & 30 deletions src/lib/dirac/lib/ws_server.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
; taken from https://github.com/tomjakubowski/weasel/tree/8bfeb29dbaf903e299b2a3296caed52b5761318f
(ns dirac.lib.ws-server
(:require [org.httpkit.server :as http]
[clojure.core.async :as core-async :refer [chan <!! <! >!! put! alts!! timeout go go-loop]]
[clojure.tools.logging :as log]
[dirac.lib.utils :as utils]
[dirac.logging :as logging])
Expand Down Expand Up @@ -53,10 +54,7 @@

(defn add-client! [server client]
{:pre [(instance? WebSocketServer server)]}
(swap! (:clients server) conj client)
(let [first-client-promise (get-first-client-promise server)]
(if-not (realized? first-client-promise)
(deliver first-client-promise client))))
(swap! (:clients server) conj client))

(defn has-clients? [server]
{:pre [(instance? WebSocketServer server)]}
Expand All @@ -82,9 +80,14 @@
port (get-local-port server)]
(utils/get-ws-url host port)))

(defn deliver-first-client-promise! [server client]
(let [first-client-promise (get-first-client-promise server)]
(if-not (realized? first-client-promise)
(deliver first-client-promise client))))

; -- client data ------------------------------------------------------------------------------------------------------------

(defrecord WebSocketServerClient [id channel ready-promise]
(defrecord WebSocketServerClient [id channel jobs-channel done-promise]
Object
(toString [this]
(str "[WebSocketServerClient#" (:id this) "]")))
Expand All @@ -97,7 +100,7 @@
(vswap! last-client-id inc))

(defn make-client [channel]
(let [client (WebSocketServerClient. (next-client-id!) channel (promise))]
(let [client (WebSocketServerClient. (next-client-id!) channel (chan 32) (promise))]
(log/trace "Made" (str client))
client))

Expand All @@ -107,15 +110,17 @@
{:pre [(instance? WebSocketServerClient client)]}
(:channel client))

(defn get-ready-promise [client]
(defn get-done-promise [client]
{:pre [(instance? WebSocketServerClient client)]}
(:done-promise client))

(defn get-jobs-channel [client]
{:pre [(instance? WebSocketServerClient client)]}
(:ready-promise client))
(:jobs-channel client))

; http server is multithreaded and we can potentially enter handlers before accept-client finishes
; this is a guard to wait for full client initialization after executing potentially long running on-incoming-client
(defn wait-for-client-to-be-ready [client]
(defn add-job! [client & args]
{:pre [(instance? WebSocketServerClient client)]}
@(get-ready-promise client))
(put! (get-jobs-channel client) args))

; -- serialization ---------------------------------------------------------------------------------------------------------
; TODO: make serialization pluggable
Expand All @@ -129,41 +134,60 @@
; -- request handling -------------------------------------------------------------------------------------------------------

(defn on-close [server client status]
(wait-for-client-to-be-ready client)
(let [{:keys [on-client-disconnection on-leaving-client]} (get-options server)]
(if on-client-disconnection
(on-client-disconnection server client status))
(if on-leaving-client
(on-leaving-client server client))
(remove-client! server client)))
(core-async/close! (get-jobs-channel client))
(future ; don't block job-processing-loop
(let [{:keys [on-client-disconnection on-leaving-client]} (get-options server)]
@(get-done-promise client) ; wait for all pending jobs to get processed
(if on-client-disconnection
(on-client-disconnection server client status))
(if on-leaving-client
(on-leaving-client server client))
(remove-client! server client))))

(defn on-receive [server client serialized-msg]
(wait-for-client-to-be-ready client)
(let [{:keys [on-receive on-message]} (get-options server)]
(if on-receive
(on-receive server client serialized-msg))
(if on-message
(let [msg (unserialize-msg serialized-msg)]
(on-message server client msg)))))

(defn accept-client [server channel]
(let [client (make-client channel)
{:keys [on-incoming-client]} (get-options server)]
(defn accept-client! [server client]
(let [{:keys [on-incoming-client]} (get-options server)]
(add-client! server client)
(http/on-close channel (partial on-close server client))
(http/on-receive channel (partial on-receive server client))
(if on-incoming-client
(on-incoming-client server client))
(deliver (get-ready-promise client) true)))
(deliver-first-client-promise! server client)))

(defn run-client-job-processing-loop! [client]
(let [jobs (get-jobs-channel client)]
(log/debug (str client) "Entering job-processing-loop")
(go-loop []
(if-let [[method & args] (<! jobs)]
(do
(case method
:receive (apply on-receive args)
:close (apply on-close args))
(recur))
(do
(log/debug (str client) "Leaving job-processing-loop")
(deliver (get-done-promise client) true))))))

(defn boot-client! [server client]
(let [{:keys [on-client-connection]} (get-options server)
accepted? (or (not on-client-connection) (not= :rejected (on-client-connection server (get-channel client))))]
(when accepted?
(accept-client! server client)
(run-client-job-processing-loop! client))))

(defn on-new-client-connection [server request]
(http/with-channel request channel
(if-not (http/websocket? channel)
{:status 200 :body "Please connect with a websocket client!"}
(let [{:keys [on-client-connection]} (get-options server)
accepted? (or (not on-client-connection) (not= :rejected (on-client-connection server channel)))]
(if accepted?
(accept-client server channel))))))
(let [client (make-client channel)]
(http/on-receive channel (partial add-job! client :receive server client))
(http/on-close channel (partial add-job! client :close server client))
(boot-client! server client)))))

; -- sending ----------------------------------------------------------------------------------------------------------------

Expand Down

0 comments on commit 5a20682

Please sign in to comment.