Skip to content
This repository has been archived by the owner on Sep 2, 2020. It is now read-only.

Commit

Permalink
switch to net for async networking (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
pyr authored Mar 2, 2017
1 parent 8474105 commit d26e0dd
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 254 deletions.
6 changes: 3 additions & 3 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
[org.clojure/tools.logging "0.3.1"]
[org.clojure/tools.cli "0.3.3"]
[com.stuartsierra/component "0.3.2"]
[spootnik/unilog "0.7.17"]
[spootnik/unilog "0.7.19"]
[spootnik/uncaught "0.5.3"]
[spootnik/globber "0.4.1"]
[spootnik/reporter "0.1.17"]
[spootnik/signal "0.2.1"]
[spootnik/net "0.3.3-beta9"]
[org.javassist/javassist "3.21.0-GA"]
[instaparse "1.4.5"]
[cheshire "5.7.0"]
Expand All @@ -28,5 +29,4 @@
[org.jctools/jctools-core "2.0.1"]
[com.boundary/high-scale-lib "1.0.6"]
[net.jpountz.lz4/lz4 "1.3.0"]
[org.xerial.snappy/snappy-java "1.1.2.6"]
[io.netty/netty-all "4.1.8.Final"]])
[org.xerial.snappy/snappy-java "1.1.2.6"]])
2 changes: 1 addition & 1 deletion src/io/cyanite/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[io.cyanite.index :as index]
[io.cyanite.store :as store]
[io.cyanite.query :as query]
[io.cyanite.http :as http]
[net.http.server :as http]
[io.cyanite.engine.drift :refer [epoch!]]
[io.cyanite.utils :refer [nbhm assoc-if-absent!]]
[clj-time.coerce :refer [to-epoch]]
Expand Down
165 changes: 0 additions & 165 deletions src/io/cyanite/http.clj

This file was deleted.

7 changes: 3 additions & 4 deletions src/io/cyanite/input.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
(:require [com.stuartsierra.component :as component]
[io.cyanite.engine :as engine]
[io.cyanite.input.carbon :as carbon]
[io.cyanite.input.tcp :refer [map->TCPInput]]
[clojure.tools.logging :refer [info]]))

(defmulti build-input (comp (fnil keyword "carbon") :type))

(defmethod build-input :carbon
[options]
(component/using
(map->TCPInput (assoc options
:port (or (:port options) 2003)
:pipeline carbon/pipeline))
(carbon/map->CarbonTCPInput (assoc options
:port (or (:port options) 2003)
:pipeline carbon/pipeline))
[:engine]))
45 changes: 30 additions & 15 deletions src/io/cyanite/input/carbon.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
(ns io.cyanite.input.carbon
(:require [io.cyanite.engine :as engine]
[io.cyanite.input.tcp :as tcp]
[clojure.string :refer [split]]
[clojure.tools.logging :refer [info]])
(:import io.netty.handler.codec.LineBasedFrameDecoder
io.netty.handler.codec.string.StringDecoder
io.netty.handler.timeout.ReadTimeoutHandler
io.netty.util.CharsetUtil))
(:require [com.stuartsierra.component :as component]
[io.cyanite.engine :as engine]
[net.tcp :as tcp]
[net.ty.pipeline :as pipeline]
[clojure.string :refer [split]]
[clojure.tools.logging :refer [info warn]]))

(defn parse-line
[^String line]
Expand All @@ -31,10 +29,27 @@
{:path path :metric metric :time time})))

(defn pipeline
[^Integer read-timeout engine]
[#(LineBasedFrameDecoder. 2048)
(StringDecoder. (CharsetUtil/UTF_8))
#(ReadTimeoutHandler. read-timeout)
(tcp/with-input input
(when (seq input)
(engine/enqueue! engine (parse-line input))))])
[engine read-timeout]
(pipeline/channel-initializer
[(pipeline/line-based-frame-decoder 2048)
pipeline/string-decoder
(pipeline/read-timeout-handler read-timeout)
(pipeline/with-input [ctx msg]
(when (seq msg)
(engine/enqueue! engine (parse-line msg))))]))

(defrecord CarbonTCPInput [host port timeout server engine]
component/Lifecycle
(start [this]
(let [timeout (or timeout 30)
host (or host "127.0.0.1")
port (or port 2002)
server (tcp/server {:handler (pipeline engine timeout)} host port)]
(try
(assoc this :server server)
(catch Exception e
(warn e "could not start server")))))
(stop [this]
(when server
(server))
(assoc this :server nil)))
66 changes: 0 additions & 66 deletions src/io/cyanite/input/tcp.clj

This file was deleted.

0 comments on commit d26e0dd

Please sign in to comment.