From 82457f560716e0faa6271a7fb41c47f05689ee29 Mon Sep 17 00:00:00 2001 From: stop-cran Date: Mon, 3 Dec 2018 17:23:49 +0200 Subject: [PATCH] Added clock. Added interfaces for a possibility of DI. --- README.md | 2 +- ZabbixSender.Async/Formatter.cs | 151 ++++++++++++++++++ ZabbixSender.Async/IFormatter.cs | 15 ++ ZabbixSender.Async/ISender.cs | 19 +++ ZabbixSender.Async/NetworkStreamExtensions.cs | 23 +++ ZabbixSender.Async/ProtocolException.cs | 27 ++++ ZabbixSender.Async/SendData.cs | 16 +- ZabbixSender.Async/Sender.cs | 125 +++------------ ZabbixSender.Async/SenderResponse.cs | 8 +- ZabbixSender.Async/SenderResponseInfo.cs | 40 +++++ ZabbixSender.Async/SenderSkeleton.cs | 65 ++++++++ ZabbixSender.Async/TaskExtensions.cs | 15 ++ ZabbixSender.Async/ZabbixSender.Async.csproj | 8 + 13 files changed, 408 insertions(+), 106 deletions(-) create mode 100644 ZabbixSender.Async/Formatter.cs create mode 100644 ZabbixSender.Async/IFormatter.cs create mode 100644 ZabbixSender.Async/ISender.cs create mode 100644 ZabbixSender.Async/NetworkStreamExtensions.cs create mode 100644 ZabbixSender.Async/ProtocolException.cs create mode 100644 ZabbixSender.Async/SenderResponseInfo.cs create mode 100644 ZabbixSender.Async/SenderSkeleton.cs create mode 100644 ZabbixSender.Async/TaskExtensions.cs diff --git a/README.md b/README.md index 639ce06..50635e4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Synopsis -This package provide a tool to send data to Zabbix in the same way as zabbix_sender tool. It implements [Zabbix Sender Protocol 4.0](https://www.zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0). +The package provides a tool to send data to Zabbix in the same way as [zabbix_sender](https://www.zabbix.com/documentation/4.0/ru/manual/concepts/sender) tool. It implements [Zabbix Sender Protocol 4.0](https://www.zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0). # Installation diff --git a/ZabbixSender.Async/Formatter.cs b/ZabbixSender.Async/Formatter.cs new file mode 100644 index 0000000..d30a6d0 --- /dev/null +++ b/ZabbixSender.Async/Formatter.cs @@ -0,0 +1,151 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; +using Newtonsoft.Json.Serialization; +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace ZabbixSender.Async +{ + /// + /// An auxiliary class for Zabbix sender protocol 4.0 request and response formatting. + /// See full specs at https://zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0. + /// + public class Formatter : IFormatter + { + private static readonly byte[] ZabbixHeader = Encoding.ASCII.GetBytes("ZBXD\x01"); + + private readonly int bufferSize; + + private readonly JsonSerializer serializer; + + /// + /// Initializes a new instance of the ZabbixSender.Async.Formatter class with custom JsonSerializerSettings. + /// + /// Stream buffer size. + public Formatter(int bufferSize = 1024) : + this(new JsonSerializerSettings + { + ContractResolver = new CamelCasePropertyNamesContractResolver(), + NullValueHandling = NullValueHandling.Ignore, + Converters = + { + new UnixDateTimeConverter() + } + }, bufferSize) + { } + + /// + /// Initializes a new instance of the ZabbixSender.Async.Formatter class. + /// + /// Custom Json serialization settings. + /// Stream buffer size. + public Formatter(JsonSerializerSettings settings, int bufferSize = 1024) + { + serializer = JsonSerializer.Create(settings); + this.bufferSize = bufferSize; + } + + public void WriteRequest(Stream stream, IEnumerable data) + { + using (var ms = new MemoryStream()) + { + using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true)) + using (var jsonWriter = new JsonTextWriter(writer)) + serializer.Serialize( + jsonWriter, + new + { + Request = "sender data", + Data = data, + Clock = DateTime.Now + }); + + var lengthBytes = BitConverter.GetBytes(ms.Length); + + stream.Write(ZabbixHeader, 0, ZabbixHeader.Length); + stream.Write(BitConverter.GetBytes(ms.Length), 0, lengthBytes.Length); + + ms.Seek(0, SeekOrigin.Begin); + ms.CopyTo(stream, bufferSize); + } + } + + public async Task WriteRequestAsync(Stream stream, IEnumerable data, + CancellationToken cancellationToken) + { + using (var ms = new MemoryStream()) + { + using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true)) + using (var jsonWriter = new JsonTextWriter(writer)) + serializer.Serialize( + jsonWriter, + new + { + Request = "sender data", + Data = data, + Clock = DateTime.Now + }); + + var lengthBytes = BitConverter.GetBytes(ms.Length); + + await stream.WriteAsync(ZabbixHeader, 0, ZabbixHeader.Length, + cancellationToken); + await stream.WriteAsync(BitConverter.GetBytes(ms.Length), 0, + lengthBytes.Length, cancellationToken); + + ms.Seek(0, SeekOrigin.Begin); + await ms.CopyToAsync(stream, bufferSize, cancellationToken); + } + } + + public SenderResponse ReadResponse(Stream stream) + { + int responseSize = Math.Max(bufferSize, 128); + var response = new byte[responseSize]; + var count = stream.Read(response, 0, response.Length); + var begin = Array.IndexOf(response, (byte)'{'); + + if (count <= 0) + throw new ProtocolException("empty reponse received"); + + if (begin == -1) + throw new ProtocolException("start of Json ({) not found", response); + + if (count >= responseSize) + throw new ProtocolException("the response is too big", response); + + try + { + using (var ms = new MemoryStream(response, begin, count - begin)) + { + using (var reader = new StreamReader(ms, Encoding.ASCII)) + using (var jsonReader = new JsonTextReader(reader)) + return serializer.Deserialize(jsonReader); + } + } + catch (JsonException ex) + { + throw new ProtocolException("invalid response format", ex, response); + } + } + + public async Task ReadResponseAsync(Stream stream, + CancellationToken cancellationToken) + { + var response = new byte[1024]; + var count = await stream.ReadAsync(response, 0, response.Length, cancellationToken); + var begin = Array.IndexOf(response, (byte)'{'); + + using (var ms = new MemoryStream(response, begin, count - begin)) + { + using (var reader = new StreamReader(ms, Encoding.ASCII)) + using (var jsonReader = new JsonTextReader(reader)) + return serializer.Deserialize(jsonReader); + } + } + } +} diff --git a/ZabbixSender.Async/IFormatter.cs b/ZabbixSender.Async/IFormatter.cs new file mode 100644 index 0000000..28ff78b --- /dev/null +++ b/ZabbixSender.Async/IFormatter.cs @@ -0,0 +1,15 @@ +using System.Collections.Generic; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace ZabbixSender.Async +{ + public interface IFormatter + { + SenderResponse ReadResponse(Stream stream); + Task ReadResponseAsync(Stream stream, CancellationToken cancellationToken); + void WriteRequest(Stream stream, IEnumerable data); + Task WriteRequestAsync(Stream stream, IEnumerable data, CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/ZabbixSender.Async/ISender.cs b/ZabbixSender.Async/ISender.cs new file mode 100644 index 0000000..00421e6 --- /dev/null +++ b/ZabbixSender.Async/ISender.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace ZabbixSender.Async +{ + public interface ISender + { + Task Send(IEnumerable data); + + Task Send(IEnumerable data, CancellationToken cancellationToken); + + Task Send(params SendData[] data); + + Task Send(string host, string key, string value); + + Task Send(string host, string key, string value, CancellationToken cancellationToken); + } +} diff --git a/ZabbixSender.Async/NetworkStreamExtensions.cs b/ZabbixSender.Async/NetworkStreamExtensions.cs new file mode 100644 index 0000000..7ece942 --- /dev/null +++ b/ZabbixSender.Async/NetworkStreamExtensions.cs @@ -0,0 +1,23 @@ +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace ZabbixSender.Async +{ + internal static class NetworkStreamExtensions + { + public static async Task WaitForDataAvailable(this NetworkStream stream, int millisecondsDelay, + CancellationToken cancellationToken) + { + const int millisecondsDelayStep = 50; + + for (int retry = 0; !stream.DataAvailable; retry += millisecondsDelayStep) + { + await Task.Delay(millisecondsDelayStep, cancellationToken); + + if (retry >= millisecondsDelay || millisecondsDelay == 0) + throw new TaskCanceledException(); + } + } + } +} diff --git a/ZabbixSender.Async/ProtocolException.cs b/ZabbixSender.Async/ProtocolException.cs new file mode 100644 index 0000000..1f57db5 --- /dev/null +++ b/ZabbixSender.Async/ProtocolException.cs @@ -0,0 +1,27 @@ +using System; + +namespace ZabbixSender.Async +{ + public sealed class ProtocolException : Exception + { + public ProtocolException(string message) : + base($"Protocol error - {message}.") + { } + + public ProtocolException(string message, Exception innerException) : + base($"Protocol error - {message}.", innerException) + { } + + public ProtocolException(string message, byte[] response) : + base($"Protocol error - {message}. See the whole response in Data property by Response key.") + { + Data.Add("Response", response); + } + + public ProtocolException(string message, Exception innerException, byte[] response) : + base($"Protocol error - {message}. See the whole response in Data property by Response key.", innerException) + { + Data.Add("Response", response); + } + } +} diff --git a/ZabbixSender.Async/SendData.cs b/ZabbixSender.Async/SendData.cs index 3de6e10..01fcdcd 100644 --- a/ZabbixSender.Async/SendData.cs +++ b/ZabbixSender.Async/SendData.cs @@ -1,5 +1,10 @@ -namespace ZabbixSender.Async +using System; + +namespace ZabbixSender.Async { + /// + /// Represents a data frame to be sent to Zabbix. + /// public class SendData { /// @@ -9,7 +14,7 @@ public class SendData /// /// A key of the item to send. - /// Should belong to the specified host and have "Zabbix sender" type. + /// Should belong to the specified host and have "Zabbix sender" type. /// public string Key { get; set; } @@ -18,5 +23,10 @@ public class SendData /// Should be formatted in a way to respect the configured "type of information" of the item. /// public string Value { get; set; } + + /// + /// A timestamp for the provided value. Leave null for ongoing values. + /// + public DateTime? Clock { get; set; } } -} \ No newline at end of file +} diff --git a/ZabbixSender.Async/Sender.cs b/ZabbixSender.Async/Sender.cs index c72998b..9ee86d8 100644 --- a/ZabbixSender.Async/Sender.cs +++ b/ZabbixSender.Async/Sender.cs @@ -1,35 +1,29 @@ -using Newtonsoft.Json; -using Newtonsoft.Json.Serialization; -using System; -using System.Collections.Generic; -using System.IO; +using System; using System.Net.Sockets; -using System.Text; using System.Threading; using System.Threading.Tasks; namespace ZabbixSender.Async { - public class Sender + /// + /// A class to send data to Zabbix. Be sure, that all used host names and items are configured. + /// All the items should have type "Zabbix trapper" to support Zabbix sender data flow. + /// + public class Sender : SenderSkeleton { - private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); - - private static readonly byte[] ZabbixHeader = new byte[] { 0x5a, 0x42, 0x58, 0x44, 0x01 }; - /// - /// Use this class to send data to Zabbix. Be sure, that all used host names and items are configured. - /// All the items should have type "Zabbix trapper" to support Zabbix sender data flow. + /// Initializes a new instance of the ZabbixSender.Async.Sender class. /// /// Host name or IP address of Zabbix server or proxy. /// Zabbix server port. - /// Send data request timeout in milliseconds. - /// Stream writer buffer size. + /// Connect, send and receive timeout in milliseconds. + /// Buffer size for stream reading and writing. public Sender(string zabbixServer, int port = 10051, int timeout = 500, int bufferSize = 1024) + : base(CreateTcpClient(zabbixServer, port, timeout, bufferSize), + () => new Formatter(bufferSize)) { ZabbixServer = zabbixServer; Port = port; - Timeout = timeout; - BufferSize = bufferSize; } /// @@ -42,94 +36,23 @@ public Sender(string zabbixServer, int port = 10051, int timeout = 500, int buff /// public int Port { get; } - /// - /// Send data request timeout in milliseconds. - /// - public int Timeout { get; } - - /// - /// Stream writer buffer size. - /// - public int BufferSize { get; } - - public Task Send(string host, string key, string value) => - Send(host, key, value, CancellationToken.None); - - public Task Send(string host, string key, string value, CancellationToken cancellationToken) => - Send(new[] - { - new SendData - { - Host = host, - Key = key, - Value = value - } - }, cancellationToken); - - public Task Send(params SendData[] data) => - Send(data, CancellationToken.None); - - public Task Send(IEnumerable data) => - Send(data, CancellationToken.None); - - public async Task Send(IEnumerable data, CancellationToken cancellationToken) + private static Func> CreateTcpClient(string zabbixServer, int port, int timeout, int bufferSize) { - using (var tcpClient = new TcpClient()) + var tcpClient = new TcpClient { - await tcpClient.ConnectAsync(ZabbixServer, Port); - - using (var networkStream = tcpClient.GetStream()) - { - var serializer = JsonSerializer.Create(new JsonSerializerSettings - { - ContractResolver = new CamelCasePropertyNamesContractResolver() - }); - - using (var ms = new MemoryStream()) - { - using (var writer = new StreamWriter(ms, Encoding.ASCII, BufferSize, true)) - using (var jsonWriter = new JsonTextWriter(writer)) - serializer.Serialize( - jsonWriter, - new - { - request = "sender data", - data, - clock = GetCurrentUnixTime() - }); - - await networkStream.WriteAsync(ZabbixHeader, 0, 5, cancellationToken); - await networkStream.WriteAsync(BitConverter.GetBytes(ms.Length), 0, 8, cancellationToken); - - ms.Seek(0, SeekOrigin.Begin); - await ms.CopyToAsync(networkStream, BufferSize, cancellationToken); - } + SendTimeout = timeout, + ReceiveTimeout = timeout, + SendBufferSize = bufferSize, + ReceiveBufferSize = bufferSize + }; - networkStream.Flush(); - - for (int retry = 0; !networkStream.DataAvailable; retry += 50) - { - await Task.Delay(50, cancellationToken); - - if (retry >= Timeout) - throw new TaskCanceledException(); - } - - var response = new byte[BufferSize]; - var count = await networkStream.ReadAsync(response, 0, response.Length, cancellationToken); - var begin = Array.IndexOf(response, (byte)'{'); + return async cancellationToken => + { + await tcpClient.ConnectAsync(zabbixServer, port) + .WithTimeout(timeout, cancellationToken); - using (var ms = new MemoryStream(response, begin, count - begin)) - { - using (var reader = new StreamReader(ms, Encoding.ASCII)) - using (var jsonReader = new JsonTextReader(reader)) - return serializer.Deserialize(jsonReader); - } - } - } + return tcpClient; + }; } - - private static long GetCurrentUnixTime() => - (long)(DateTime.UtcNow - UnixEpoch).TotalSeconds; } } diff --git a/ZabbixSender.Async/SenderResponse.cs b/ZabbixSender.Async/SenderResponse.cs index ac66d2d..0406c9b 100644 --- a/ZabbixSender.Async/SenderResponse.cs +++ b/ZabbixSender.Async/SenderResponse.cs @@ -1,9 +1,15 @@ namespace ZabbixSender.Async { + /// + /// Represents a response from Zabbix on the data sent. + /// public class SenderResponse { public string Response { get; set; } public string Info { get; set; } public bool IsSuccess => Response == "success"; + + public SenderResponseInfo ParseInfo() => + new SenderResponseInfo(Info); } -} \ No newline at end of file +} diff --git a/ZabbixSender.Async/SenderResponseInfo.cs b/ZabbixSender.Async/SenderResponseInfo.cs new file mode 100644 index 0000000..b3468aa --- /dev/null +++ b/ZabbixSender.Async/SenderResponseInfo.cs @@ -0,0 +1,40 @@ +using System; +using System.Globalization; +using System.Text.RegularExpressions; + +namespace ZabbixSender.Async +{ + public class SenderResponseInfo + { + private readonly string info; + + public SenderResponseInfo(string info) + { + this.info = info; + + var match = Regex.Match(info, @"processed: (\d+); failed: (\d+); total: (\d+); seconds spent: (\d+\.\d+)"); + + if (!match.Success) + throw new ProtocolException($"Info field has an unexpected format: \"{info}\""); + + try + { + Processed = Convert.ToInt32(match.Groups[1].Value, CultureInfo.InvariantCulture); + Failed = Convert.ToInt32(match.Groups[2].Value, CultureInfo.InvariantCulture); + Total = Convert.ToInt32(match.Groups[3].Value, CultureInfo.InvariantCulture); + SecondsSpent = Convert.ToDouble(match.Groups[4].Value, CultureInfo.InvariantCulture); + } + catch (FormatException ex) + { + throw new ProtocolException($"Info field has an unexpected format: \"{info}\"", ex); + } + } + + public int Processed { get; } + public int Failed { get; } + public int Total { get; } + public double SecondsSpent { get; } + + public override string ToString() => info; + } +} \ No newline at end of file diff --git a/ZabbixSender.Async/SenderSkeleton.cs b/ZabbixSender.Async/SenderSkeleton.cs new file mode 100644 index 0000000..b3c1afc --- /dev/null +++ b/ZabbixSender.Async/SenderSkeleton.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace ZabbixSender.Async +{ + /// + /// A basic class for sending datato Zabbix, which allows custom setup for TcpClient and Formatter. + /// + public class SenderSkeleton : ISender + { + private readonly Func> tcpClientFactory; + private readonly Func formatterFactory; + + /// + /// Initializes a new instance of the ZabbixSender.Async.SenderSkeleton class. + /// + /// An async factory delegate for TcpClient. + /// A factory delegate for the data formatter. + public SenderSkeleton( + Func> tcpClientFactory, + Func formatterFactory) + { + this.tcpClientFactory = tcpClientFactory; + this.formatterFactory = formatterFactory; + } + + public Task Send(string host, string key, string value) => + Send(host, key, value, CancellationToken.None); + + public Task Send(string host, string key, string value, CancellationToken cancellationToken) => + Send(new[] + { + new SendData + { + Host = host, + Key = key, + Value = value + } + }, cancellationToken); + + public Task Send(params SendData[] data) => + Send(data, CancellationToken.None); + + public Task Send(IEnumerable data) => + Send(data, CancellationToken.None); + + public async Task Send(IEnumerable data, CancellationToken cancellationToken) + { + using (var tcpClient = await tcpClientFactory(cancellationToken)) + using (var networkStream = tcpClient.GetStream()) + { + var formatter = formatterFactory(); + + await formatter.WriteRequestAsync(networkStream, data, cancellationToken); + await networkStream.FlushAsync(); + await networkStream.WaitForDataAvailable(tcpClient.ReceiveTimeout, cancellationToken); + + return await formatter.ReadResponseAsync(networkStream, cancellationToken); + } + } + } +} diff --git a/ZabbixSender.Async/TaskExtensions.cs b/ZabbixSender.Async/TaskExtensions.cs new file mode 100644 index 0000000..ed9535f --- /dev/null +++ b/ZabbixSender.Async/TaskExtensions.cs @@ -0,0 +1,15 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace ZabbixSender.Async +{ + internal static class TaskExtensions + { + public static async Task WithTimeout(this Task task, int millisecondsDelay, + CancellationToken cancellationToken) + { + if (await Task.WhenAny(task, Task.Delay(millisecondsDelay, cancellationToken)) != task) + throw new TaskCanceledException(); + } + } +} diff --git a/ZabbixSender.Async/ZabbixSender.Async.csproj b/ZabbixSender.Async/ZabbixSender.Async.csproj index 4311c95..c8a7d3c 100644 --- a/ZabbixSender.Async/ZabbixSender.Async.csproj +++ b/ZabbixSender.Async/ZabbixSender.Async.csproj @@ -2,6 +2,14 @@ netcoreapp2.1;net461 + 1.0.1 + true + Apache License 2.0 (stop-cran, 2018) + stop-cran <stop-cran@list.ru> + stop-cran <stop-cran@list.ru> + https://github.com/stop-cran/ZabbixSender.Async + Added clock. Added interfaces for a possibility of DI. + The package provides a tool to send data to Zabbix in the same way as zabbix_sender tool. It implements Zabbix Sender Protocol 4.0.