Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAMP Node Version 2 #14

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
(defproject clj-wamp "1.0.2"
(defproject clj-wamp "2.0.0-SNAPSHOT"
:description "The WebSocket Application Messaging Protocol for Clojure"
:url "https://github.com/cgmartin/clj-wamp"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:min-lein-version "2.0.0"
:dependencies [[org.clojure/clojure "1.4.0"]
:dependencies [[org.clojure/clojure "1.6.0"]
[org.clojure/core.async "0.1.346.0-17112a-alpha"]
[org.clojure/core.incubator "0.1.2"]
[org.clojure/tools.logging "0.2.6"]
[org.clojure/data.codec "0.1.0"]
[http-kit "2.1.5"]
[cheshire "5.2.0"]]
[cheshire "5.2.0"]
[stylefruits/gniazdo "0.4.0"]]
:profiles {:1.5 {:dependencies [[org.clojure/clojure "1.5.1"]]}
:dev {:dependencies [[log4j "1.2.17" :exclusions [javax.mail/mail
javax.jms/jms
Expand Down
84 changes: 84 additions & 0 deletions src/clj_wamp/core.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
(ns clj-wamp.core
(:require
[clojure.tools.logging :as log]
[clojure.string :refer [split]]
[org.httpkit.server :as httpkit]
[cheshire.core :as json])
(:import
[java.util Random]))

(def ^:const project-version "clj-wamp/2.0.0-SNAPSHOT")

(def ^:private rand-gen (Random.))
(def ^:private sess-id-max 9007199254740992) ; 2^53 per WAMP spec

(defn new-rand-id []
(mod (.nextLong rand-gen) sess-id-max))

;; Client utils

(def client-channels (ref {}))
(def client-prefixes (ref {}))
(def client-auth (ref {}))

(defn add-client
"Adds a websocket channel (or callback function) to a map of clients
and returns a unique session id."
[channel-or-fn]
(let [sess-id (new-rand-id)]
(dosync (alter client-channels assoc sess-id channel-or-fn))
sess-id))

(defn get-client-channel
"Returns the channel (or callback function) for a websocket client's
session id."
[sess-id]
(get @client-channels sess-id))

(defn del-client
"Removes a websocket session from the map of clients."
[sess-id]
(dosync
(alter client-channels dissoc sess-id)
(alter client-prefixes dissoc sess-id)
(alter client-auth dissoc sess-id)))

(defn add-topic-prefix
"Adds a new CURI topic prefix for a websocket client."
[sess-id prefix uri]
(log/trace "New CURI Prefix [" sess-id "]" prefix uri)
(dosync
(alter client-prefixes assoc-in [sess-id prefix] uri)))

(defn get-topic
"Returns the full topic URI for a prefix. If prefix does not exist,
returns the CURI passed in."
[sess-id curi]
(let [topic (split curi #":")
prefix (first topic)
suffix (second topic)]
(if-let [uri (get-in @client-prefixes [sess-id prefix])]
(str uri suffix)
curi)))

(defn close-channel
([sess-id]
(close-channel sess-id 1002))
([sess-id code]
(when-let [channel (get-client-channel sess-id)]
(if (fn? channel)
(httpkit/close channel) ; for unit testing
(.serverClose channel code)) ; TODO thread-safe? (locking AsyncChannel ...) ?
(log/trace "Channel closed" code))))

(defn send!
"Sends data to a websocket client."
[sess-id & data]
(dosync
(let [channel-or-fn (get-client-channel sess-id)
json-data (json/encode data {:escape-non-ascii true})]
(log/trace "Sending data:" data)
(if (fn? channel-or-fn) ; application callback?
(channel-or-fn data)
(when channel-or-fn
(httpkit/send! channel-or-fn json-data))))))
114 changes: 114 additions & 0 deletions src/clj_wamp/node.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
(ns ^{:author "Ryan Sundberg"
:doc "WAMP V2 application node"}
clj-wamp.node
(:require
[clojure.tools.logging :as log]
[cheshire.core :as json]
[gniazdo.core :as ws]
[clj-wamp.core :as core]
[clj-wamp.v2 :as wamp])
(:import
[org.eclipse.jetty.websocket.client WebSocketClient]))

(defn- handle-connect
[{:keys [debug? registrations on-call] :as instance} session]
(when debug?
(log/debug "Connected to WAMP router with session" session))
; there might be a race condition here if we get a "welcome" message before the connect event
(reset! registrations [on-call {} {}]))

(defn- handle-message
[{:keys [debug?] :as instance} msg-str]
(let [msg-data (try (json/decode msg-str)
(catch com.fasterxml.jackson.core.JsonParseException ex
[nil nil]))]
(when debug?
(log/debug "WAMP message received:" msg-str))
(wamp/handle-message instance msg-data)))

(declare connect!)

(defn- handle-close
[{:keys [debug?] :as instance} code reason]
(when debug?
(log/debug "Disconnected from WAMP router:" code reason))
(reset! (:socket instance) nil)
(when @(:reconnect-state instance)
(connect! instance)))

(defn- handle-error
[instance ex]
(log/error ex "WAMP socket error"))

(defn publish!
"Publish an event"
([instance event-uri]
(publish! instance event-uri nil))
([instance event-uri seq-args]
(publish! instance event-uri seq-args nil))
([instance event-uri seq-args kw-args]
(wamp/publish instance (core/new-rand-id) {} event-uri seq-args kw-args)))

(defn publish-to!
"Publish an event to specific session ids"
[instance session-ids event-uri seq-args kw-args]
(wamp/publish instance (core/new-rand-id) {:eligible session-ids} event-uri seq-args kw-args))

(defn- try-connect [{:keys [debug? router-uri] :as instance}]
(try
(swap! (:socket instance)
(fn [socket]
(when (nil? socket)
(when debug?
(log/debug "Connecting to WAMP router at" router-uri))
(let [socket (ws/connect
router-uri
:client (:client instance)
:headers {}
:subprotocols [wamp/subprotocol-id]
:on-connect (partial handle-connect instance)
:on-receive (partial handle-message instance)
:on-close (partial handle-close instance)
:on-error (partial handle-error instance))]
socket))))
(wamp/hello instance)
true
(catch Exception e
(log/error e "Failed to connect to WAMP router")
false)))

(defn connect! [{:keys [reconnect-state reconnect? reconnect-wait-ms] :as instance}]
(reset! reconnect-state reconnect?)
(let [connected? (try-connect instance)]
(if connected?
instance
(if @reconnect-state
(do
(Thread/sleep reconnect-wait-ms)
(recur instance))
instance))))

(defn disconnect! [{:keys [debug?] :as instance}]
(reset! (:reconnect-state instance) false)
(swap! (:socket instance)
(fn [socket]
(when (some? socket)
(when debug?
(log/debug "Disconnecting from WAMP router"))
(ws/close socket)
nil))))

(defn create [{:keys [router-uri realm on-call] :as conf}]
{:pre [(string? router-uri)
(string? realm)]}
(let [client (ws/client (java.net.URI. router-uri))]
(.start ^WebSocketClient client)
(merge
{:debug? false
:reconnect? true
:reconnect-wait-ms 10000}
conf
{:client client
:socket (atom nil)
:reconnect-state (atom false)
:registrations (atom nil)})))
5 changes: 2 additions & 3 deletions src/clj_wamp/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
[org.httpkit.timer :as timer]
[cheshire.core :as json]
[clojure.tools.logging :as log]
[clojure.data.codec.base64 :as base64])
[clojure.data.codec.base64 :as base64]
[clj-wamp.core :refer [project-version]])
(:import [org.httpkit.server AsyncChannel]
[javax.crypto Mac]
[javax.crypto.spec SecretKeySpec]))
Expand Down Expand Up @@ -40,8 +41,6 @@
(def ^:const DESC-WAMP-ERROR-NOAUTH "unauthorized")
(def ^:const URI-WAMP-ERROR-NOAUTH (str URI-WAMP-ERROR "unauthorized"))

(def project-version "clj-wamp/1.0.2")

(def max-sess-id (atom 0))

(defn- next-sess-id []
Expand Down
Loading