From 2ba96b7d6f6642e5c25ebbf5243b73369aad5451 Mon Sep 17 00:00:00 2001 From: David Allsopp Date: Sat, 11 Aug 2018 15:07:26 +0100 Subject: [PATCH] Add Logs_syslog_unix.unix_reporter Code for restoring connections factored out of tcp_reporter and a Unix domain sockets version added in unix_reporter. --- src/logs_syslog_unix.ml | 99 +++++++++++++++++++++++++++------------- src/logs_syslog_unix.mli | 15 ++++++ 2 files changed, 83 insertions(+), 31 deletions(-) diff --git a/src/logs_syslog_unix.ml b/src/logs_syslog_unix.ml index 6b2b4e6..a605914 100644 --- a/src/logs_syslog_unix.ml +++ b/src/logs_syslog_unix.ml @@ -1,6 +1,10 @@ open Logs_syslog -let syslog_report host len send = +let syslog_report + host + len + send + (encode : ?len:int -> Syslog_message.t -> string) = let report src level ~over k msgf = let source = Logs.Src.name src in let timestamp = Ptime_clock.now () in @@ -8,7 +12,7 @@ let syslog_report host len send = let msg = message ~host ~source ~tags ?header level timestamp (flush ()) in - send (Syslog_message.encode ~len msg) ; over () ; k () + send (encode ~len msg) ; over () ; k () in msgf @@ fun ?header ?(tags = Logs.Tag.empty) fmt -> Format.kfprintf (k tags ?header) ppf fmt @@ -32,7 +36,7 @@ let udp_reporter (Ptime.to_rfc3339 (Ptime_clock.now ())) msg in - syslog_report hostname truncate send + syslog_report hostname truncate send Syslog_message.encode type state = | Disconnected @@ -41,16 +45,10 @@ type state = let wait_time = 0.01 -let tcp_reporter - ?(hostname = Unix.gethostname ()) - ip - ?(port = 514) - ?(truncate = 0) - ?(framing = `Null) () = - let sa = Unix.ADDR_INET (ip, port) in +let conn_reporter sd st sa truncate frame encode hostname = let s = ref Disconnected in let connect () = - let sock = Unix.(socket PF_INET SOCK_STREAM 0) in + let sock = Unix.(socket sd st 0) in Unix.(setsockopt sock SO_REUSEADDR true) ; Unix.(setsockopt sock SO_KEEPALIVE true) ; try @@ -59,9 +57,15 @@ let tcp_reporter Ok () with | Unix.Unix_error (e, f, _) -> + let endpoint = + match sa with + | Unix.ADDR_UNIX socket -> socket + | Unix.ADDR_INET (ip, port) -> + Printf.sprintf "%s:%d" (Unix.string_of_inet_addr ip) port + in let err = - Printf.sprintf "error %s in function %s while connecting to %s:%d\n" - (Unix.error_message e) f (Unix.string_of_inet_addr ip) port + Printf.sprintf "error %s in function %s while connecting to %s\n" + (Unix.error_message e) f endpoint in Error err in @@ -75,27 +79,60 @@ let tcp_reporter match connect () with | Error e -> Error e | Ok () -> + let transmit = + if st = Unix.SOCK_DGRAM then + fun sock b len -> ignore(Unix.send sock b 0 len []) + else + fun sock b len -> + let rec aux idx = + let should = len - idx in + let n = Unix.send sock b idx should [] in + if n <> should then aux (idx + n) + in + aux 0 + in let rec send omsg = match !s with | Disconnected -> reconnect send omsg | Connecting -> let _ = Unix.select [] [] [] wait_time in send omsg | Connected sock -> - let msg = Bytes.of_string (frame_message omsg framing) in - let len = Bytes.length msg in - let rec aux idx = - try - let should = len - idx in - let n = Unix.send sock msg idx should [] in - if n = should then () else aux (idx + n) - with - | Unix.Unix_error (Unix.EAGAIN, _, _) -> send omsg - | Unix.Unix_error (e, f, _) -> - let err = Unix.error_message e in - Printf.eprintf "error %s in function %s, reconnecting\n" err f ; - (try Unix.close sock with Unix.Unix_error _ -> ()) ; - s := Disconnected ; - reconnect send omsg - in - aux 0 + let msg = frame omsg |> Bytes.of_string in + try transmit sock msg (Bytes.length msg) with + | Unix.Unix_error (Unix.EAGAIN, _, _) -> send omsg + | Unix.Unix_error (e, f, _) -> + let err = Unix.error_message e in + Printf.eprintf "error %s in function %s, reconnecting\n" err f ; + (try Unix.close sock with Unix.Unix_error _ -> ()) ; + s := Disconnected ; + reconnect send omsg in at_exit (fun () -> match !s with Connected x -> Unix.close x | _ -> ()) ; - Ok (syslog_report hostname truncate send) + Ok (syslog_report hostname truncate send encode) + +let tcp_reporter + ?(hostname = Unix.gethostname ()) + ip + ?(port = 514) + ?(truncate = 0) + ?(framing = `Null) () = + let sa = Unix.ADDR_INET (ip, port) in + let frame msg = frame_message msg framing in + let encode = Syslog_message.encode in + conn_reporter Unix.PF_INET Unix.SOCK_STREAM sa truncate frame encode hostname + +let unix_reporter + ?(socket = "/dev/log") + ?truncate + ?framing () = + let truncate = + match truncate with + | Some truncate -> truncate + | None -> if framing = None then 65536 else 0 + in + let frame, socket_type = + match framing with + | Some framing -> (fun msg -> frame_message msg framing), Unix.SOCK_STREAM + | None -> (fun msg -> msg), Unix.SOCK_DGRAM + in + let sa = Unix.ADDR_UNIX socket in + let encode = Syslog_message.encode_local in + conn_reporter Unix.PF_UNIX socket_type sa truncate frame encode "localhost" diff --git a/src/logs_syslog_unix.mli b/src/logs_syslog_unix.mli index d38020c..6352361 100644 --- a/src/logs_syslog_unix.mli +++ b/src/logs_syslog_unix.mli @@ -26,6 +26,21 @@ val tcp_reporter : ?hostname:string -> Unix.inet_addr -> ?port:int -> ?framing:Logs_syslog.framing -> unit -> (Logs.reporter, string) result +(** [unix_reporter ~socket ~truncate ~framing ()] is [Ok reporter] or + [Error msg]. The [reporter] sends each log message via syslog to [socket] + (which defaults to ["/dev/log"]). If the initial connection to the socket + fails, the log message is reported to standard error, and attempts are made + to re-establish the connection. A syslog message is truncated to [truncate] + bytes and is framed according to the given [framing]. The default for + [truncate] is [65536] is [framing] is not provided and [0] otherwise. If + [framing] is not provided, then the socket used is a datagram socket (as + for {!udp_reporter}) otherwise a stream socket is used (as for + {!tcp_reporter}). *) +val unix_reporter : ?socket:string -> + ?truncate:int -> + ?framing:Logs_syslog.framing -> unit -> + (Logs.reporter, string) result + (** {1:unix_example Example usage} To install a Unix syslog reporter. sending via UDP to localhost, use the