From 2adbabee8da7191d9752192899cb0a637d31d6f5 Mon Sep 17 00:00:00 2001 From: David Frese Date: Wed, 27 Apr 2016 16:24:18 +0200 Subject: [PATCH] [#166] Add 3rd-party logstash appender (@dfrese) --- project.clj | 3 +- .../timbre/appenders/3rd_party/logstash.clj | 75 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 src/taoensso/timbre/appenders/3rd_party/logstash.clj diff --git a/project.clj b/project.clj index 4e3c7310..a18b0a39 100644 --- a/project.clj +++ b/project.clj @@ -37,7 +37,8 @@ [org.graylog2/gelfclient "1.3.1"] [org.julienxx/clj-slack "0.5.3"] [org.clojure/java.jdbc "0.5.8"] - [com.mchange/c3p0 "0.9.5.2"]]} + [com.mchange/c3p0 "0.9.5.2"] + [cheshire "5.5.0"]]} :dev [:1.7 :test {:dependencies [[org.clojure/clojurescript "1.7.28"]] diff --git a/src/taoensso/timbre/appenders/3rd_party/logstash.clj b/src/taoensso/timbre/appenders/3rd_party/logstash.clj new file mode 100644 index 00000000..380d7f81 --- /dev/null +++ b/src/taoensso/timbre/appenders/3rd_party/logstash.clj @@ -0,0 +1,75 @@ +(ns taoensso.timbre.appenders.3rd-party.logstash + "Timbre appender that send output to logstash. + + Requires cheshire (https://github.com/dakrone/cheshire)." + {:author "Mike Sperber (@mikesperber), David Frese (@dfrese)"} + (:require [taoensso.timbre :as timbre] + [cheshire.core :as cheshire]) + (:import [java.net Socket InetAddress] + [java.io PrintWriter])) + +;; Adapted from taoensso.timbre.appenders.3rd-party.server-socket + +(defn connect + [host port] + (let [addr (InetAddress/getByName host) + sock (Socket. addr (int port))] + [sock + (PrintWriter. (.getOutputStream sock))])) + +(defn connection-ok? + [[^Socket sock ^PrintWriter out]] + (and (not (.isClosed sock)) + (.isConnected sock) + (not (.checkError out)))) + +(def iso-format "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + +(defn data->json-stream + [data writer opts] + ;; Note: this it meant to target the logstash-filter-json; especially "message" and "@timestamp" get a special meaning there. + (let [stacktrace-str (if-let [pr (:pr-stacktrace opts)] + #(with-out-str (pr %)) + timbre/stacktrace)] + (cheshire/generate-stream + (merge (:context data) + {:level (:level data) + :namespace (:?ns-str data) + :file (:?file data) + :line (:?line data) + :stacktrace (some-> (force (:?err_ data)) (stacktrace-str)) + :hostname (force (:hostname_ data)) + :message (force (:msg_ data)) + "@timestamp" (:instant data)}) + writer + (merge {:date-format iso-format + :pretty false} + opts)))) + +(defn logstash-appender + "Returns a Logstash appender, which will send each event in JSON + format to the logstash server at `host:port`. Additionally `opts` + may be a map with `:pr-stracktrace` mapped to a function taking an + exception, which should write the stacktrace of that exception to + `*out`." + [host port & [opts]] + (let [conn (atom nil) + nl "\n"] + {:enabled? true + :async? false + :min-level nil + :rate-limit nil + :output-fn :inherit + :fn + (fn [data] + (try + (let [[sock out] (swap! conn + (fn [conn] + (or (and conn (connection-ok? conn) conn) + (connect host port))))] + (locking sock + (data->json-stream data out (select-keys opts [:pr-stacktrace])) + ;; logstash tcp input plugin: "each event is assumed to be one line of text". + (.write ^java.io.Writer out nl))) + (catch java.io.IOException _ + nil)))}))