Skip to content

Commit

Permalink
Add Logs_syslog_unix.unix_reporter
Browse files Browse the repository at this point in the history
Code for restoring connections factored out of tcp_reporter and a Unix
domain sockets version added in unix_reporter.
  • Loading branch information
dra27 committed Aug 11, 2018
1 parent 8bcea1b commit 2ba96b7
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 31 deletions.
99 changes: 68 additions & 31 deletions src/logs_syslog_unix.ml
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
open Logs_syslog

let syslog_report host len send =
let syslog_report
host
len
send
(encode : ?len:int -> Syslog_message.t -> string) =
let report src level ~over k msgf =
let source = Logs.Src.name src in
let timestamp = Ptime_clock.now () in
let k tags ?header _ =
let msg =
message ~host ~source ~tags ?header level timestamp (flush ())
in
send (Syslog_message.encode ~len msg) ; over () ; k ()
send (encode ~len msg) ; over () ; k ()
in
msgf @@ fun ?header ?(tags = Logs.Tag.empty) fmt ->
Format.kfprintf (k tags ?header) ppf fmt
Expand All @@ -32,7 +36,7 @@ let udp_reporter
(Ptime.to_rfc3339 (Ptime_clock.now ()))
msg
in
syslog_report hostname truncate send
syslog_report hostname truncate send Syslog_message.encode

type state =
| Disconnected
Expand All @@ -41,16 +45,10 @@ type state =

let wait_time = 0.01

let tcp_reporter
?(hostname = Unix.gethostname ())
ip
?(port = 514)
?(truncate = 0)
?(framing = `Null) () =
let sa = Unix.ADDR_INET (ip, port) in
let conn_reporter sd st sa truncate frame encode hostname =
let s = ref Disconnected in
let connect () =
let sock = Unix.(socket PF_INET SOCK_STREAM 0) in
let sock = Unix.(socket sd st 0) in
Unix.(setsockopt sock SO_REUSEADDR true) ;
Unix.(setsockopt sock SO_KEEPALIVE true) ;
try
Expand All @@ -59,9 +57,15 @@ let tcp_reporter
Ok ()
with
| Unix.Unix_error (e, f, _) ->
let endpoint =
match sa with
| Unix.ADDR_UNIX socket -> socket
| Unix.ADDR_INET (ip, port) ->
Printf.sprintf "%s:%d" (Unix.string_of_inet_addr ip) port
in
let err =
Printf.sprintf "error %s in function %s while connecting to %s:%d\n"
(Unix.error_message e) f (Unix.string_of_inet_addr ip) port
Printf.sprintf "error %s in function %s while connecting to %s\n"
(Unix.error_message e) f endpoint
in
Error err
in
Expand All @@ -75,27 +79,60 @@ let tcp_reporter
match connect () with
| Error e -> Error e
| Ok () ->
let transmit =
if st = Unix.SOCK_DGRAM then
fun sock b len -> ignore(Unix.send sock b 0 len [])
else
fun sock b len ->
let rec aux idx =
let should = len - idx in
let n = Unix.send sock b idx should [] in
if n <> should then aux (idx + n)
in
aux 0
in
let rec send omsg = match !s with
| Disconnected -> reconnect send omsg
| Connecting -> let _ = Unix.select [] [] [] wait_time in send omsg
| Connected sock ->
let msg = Bytes.of_string (frame_message omsg framing) in
let len = Bytes.length msg in
let rec aux idx =
try
let should = len - idx in
let n = Unix.send sock msg idx should [] in
if n = should then () else aux (idx + n)
with
| Unix.Unix_error (Unix.EAGAIN, _, _) -> send omsg
| Unix.Unix_error (e, f, _) ->
let err = Unix.error_message e in
Printf.eprintf "error %s in function %s, reconnecting\n" err f ;
(try Unix.close sock with Unix.Unix_error _ -> ()) ;
s := Disconnected ;
reconnect send omsg
in
aux 0
let msg = frame omsg |> Bytes.of_string in
try transmit sock msg (Bytes.length msg) with
| Unix.Unix_error (Unix.EAGAIN, _, _) -> send omsg
| Unix.Unix_error (e, f, _) ->
let err = Unix.error_message e in
Printf.eprintf "error %s in function %s, reconnecting\n" err f ;
(try Unix.close sock with Unix.Unix_error _ -> ()) ;
s := Disconnected ;
reconnect send omsg
in
at_exit (fun () -> match !s with Connected x -> Unix.close x | _ -> ()) ;
Ok (syslog_report hostname truncate send)
Ok (syslog_report hostname truncate send encode)

let tcp_reporter
?(hostname = Unix.gethostname ())
ip
?(port = 514)
?(truncate = 0)
?(framing = `Null) () =
let sa = Unix.ADDR_INET (ip, port) in
let frame msg = frame_message msg framing in
let encode = Syslog_message.encode in
conn_reporter Unix.PF_INET Unix.SOCK_STREAM sa truncate frame encode hostname

let unix_reporter
?(socket = "/dev/log")
?truncate
?framing () =
let truncate =
match truncate with
| Some truncate -> truncate
| None -> if framing = None then 65536 else 0
in
let frame, socket_type =
match framing with
| Some framing -> (fun msg -> frame_message msg framing), Unix.SOCK_STREAM
| None -> (fun msg -> msg), Unix.SOCK_DGRAM
in
let sa = Unix.ADDR_UNIX socket in
let encode = Syslog_message.encode_local in
conn_reporter Unix.PF_UNIX socket_type sa truncate frame encode "localhost"
15 changes: 15 additions & 0 deletions src/logs_syslog_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ val tcp_reporter : ?hostname:string -> Unix.inet_addr -> ?port:int ->
?framing:Logs_syslog.framing -> unit ->
(Logs.reporter, string) result

(** [unix_reporter ~socket ~truncate ~framing ()] is [Ok reporter] or
[Error msg]. The [reporter] sends each log message via syslog to [socket]
(which defaults to ["/dev/log"]). If the initial connection to the socket
fails, the log message is reported to standard error, and attempts are made
to re-establish the connection. A syslog message is truncated to [truncate]
bytes and is framed according to the given [framing]. The default for
[truncate] is [65536] is [framing] is not provided and [0] otherwise. If
[framing] is not provided, then the socket used is a datagram socket (as
for {!udp_reporter}) otherwise a stream socket is used (as for
{!tcp_reporter}). *)
val unix_reporter : ?socket:string ->
?truncate:int ->
?framing:Logs_syslog.framing -> unit ->
(Logs.reporter, string) result

(** {1:unix_example Example usage}
To install a Unix syslog reporter. sending via UDP to localhost, use the
Expand Down

0 comments on commit 2ba96b7

Please sign in to comment.