diff --git a/README.md b/README.md
index 639ce06..50635e4 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# Synopsis
-This package provide a tool to send data to Zabbix in the same way as zabbix_sender tool. It implements [Zabbix Sender Protocol 4.0](https://www.zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0).
+The package provides a tool to send data to Zabbix in the same way as [zabbix_sender](https://www.zabbix.com/documentation/4.0/ru/manual/concepts/sender) tool. It implements [Zabbix Sender Protocol 4.0](https://www.zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0).
# Installation
diff --git a/ZabbixSender.Async/Formatter.cs b/ZabbixSender.Async/Formatter.cs
new file mode 100644
index 0000000..d30a6d0
--- /dev/null
+++ b/ZabbixSender.Async/Formatter.cs
@@ -0,0 +1,151 @@
+using Newtonsoft.Json;
+using Newtonsoft.Json.Converters;
+using Newtonsoft.Json.Serialization;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZabbixSender.Async
+{
+ ///
+ /// An auxiliary class for Zabbix sender protocol 4.0 request and response formatting.
+ /// See full specs at https://zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0.
+ ///
+ public class Formatter : IFormatter
+ {
+ private static readonly byte[] ZabbixHeader = Encoding.ASCII.GetBytes("ZBXD\x01");
+
+ private readonly int bufferSize;
+
+ private readonly JsonSerializer serializer;
+
+ ///
+ /// Initializes a new instance of the ZabbixSender.Async.Formatter class with custom JsonSerializerSettings.
+ ///
+ /// Stream buffer size.
+ public Formatter(int bufferSize = 1024) :
+ this(new JsonSerializerSettings
+ {
+ ContractResolver = new CamelCasePropertyNamesContractResolver(),
+ NullValueHandling = NullValueHandling.Ignore,
+ Converters =
+ {
+ new UnixDateTimeConverter()
+ }
+ }, bufferSize)
+ { }
+
+ ///
+ /// Initializes a new instance of the ZabbixSender.Async.Formatter class.
+ ///
+ /// Custom Json serialization settings.
+ /// Stream buffer size.
+ public Formatter(JsonSerializerSettings settings, int bufferSize = 1024)
+ {
+ serializer = JsonSerializer.Create(settings);
+ this.bufferSize = bufferSize;
+ }
+
+ public void WriteRequest(Stream stream, IEnumerable data)
+ {
+ using (var ms = new MemoryStream())
+ {
+ using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true))
+ using (var jsonWriter = new JsonTextWriter(writer))
+ serializer.Serialize(
+ jsonWriter,
+ new
+ {
+ Request = "sender data",
+ Data = data,
+ Clock = DateTime.Now
+ });
+
+ var lengthBytes = BitConverter.GetBytes(ms.Length);
+
+ stream.Write(ZabbixHeader, 0, ZabbixHeader.Length);
+ stream.Write(BitConverter.GetBytes(ms.Length), 0, lengthBytes.Length);
+
+ ms.Seek(0, SeekOrigin.Begin);
+ ms.CopyTo(stream, bufferSize);
+ }
+ }
+
+ public async Task WriteRequestAsync(Stream stream, IEnumerable data,
+ CancellationToken cancellationToken)
+ {
+ using (var ms = new MemoryStream())
+ {
+ using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true))
+ using (var jsonWriter = new JsonTextWriter(writer))
+ serializer.Serialize(
+ jsonWriter,
+ new
+ {
+ Request = "sender data",
+ Data = data,
+ Clock = DateTime.Now
+ });
+
+ var lengthBytes = BitConverter.GetBytes(ms.Length);
+
+ await stream.WriteAsync(ZabbixHeader, 0, ZabbixHeader.Length,
+ cancellationToken);
+ await stream.WriteAsync(BitConverter.GetBytes(ms.Length), 0,
+ lengthBytes.Length, cancellationToken);
+
+ ms.Seek(0, SeekOrigin.Begin);
+ await ms.CopyToAsync(stream, bufferSize, cancellationToken);
+ }
+ }
+
+ public SenderResponse ReadResponse(Stream stream)
+ {
+ int responseSize = Math.Max(bufferSize, 128);
+ var response = new byte[responseSize];
+ var count = stream.Read(response, 0, response.Length);
+ var begin = Array.IndexOf(response, (byte)'{');
+
+ if (count <= 0)
+ throw new ProtocolException("empty reponse received");
+
+ if (begin == -1)
+ throw new ProtocolException("start of Json ({) not found", response);
+
+ if (count >= responseSize)
+ throw new ProtocolException("the response is too big", response);
+
+ try
+ {
+ using (var ms = new MemoryStream(response, begin, count - begin))
+ {
+ using (var reader = new StreamReader(ms, Encoding.ASCII))
+ using (var jsonReader = new JsonTextReader(reader))
+ return serializer.Deserialize(jsonReader);
+ }
+ }
+ catch (JsonException ex)
+ {
+ throw new ProtocolException("invalid response format", ex, response);
+ }
+ }
+
+ public async Task ReadResponseAsync(Stream stream,
+ CancellationToken cancellationToken)
+ {
+ var response = new byte[1024];
+ var count = await stream.ReadAsync(response, 0, response.Length, cancellationToken);
+ var begin = Array.IndexOf(response, (byte)'{');
+
+ using (var ms = new MemoryStream(response, begin, count - begin))
+ {
+ using (var reader = new StreamReader(ms, Encoding.ASCII))
+ using (var jsonReader = new JsonTextReader(reader))
+ return serializer.Deserialize(jsonReader);
+ }
+ }
+ }
+}
diff --git a/ZabbixSender.Async/IFormatter.cs b/ZabbixSender.Async/IFormatter.cs
new file mode 100644
index 0000000..28ff78b
--- /dev/null
+++ b/ZabbixSender.Async/IFormatter.cs
@@ -0,0 +1,15 @@
+using System.Collections.Generic;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZabbixSender.Async
+{
+ public interface IFormatter
+ {
+ SenderResponse ReadResponse(Stream stream);
+ Task ReadResponseAsync(Stream stream, CancellationToken cancellationToken);
+ void WriteRequest(Stream stream, IEnumerable data);
+ Task WriteRequestAsync(Stream stream, IEnumerable data, CancellationToken cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/ZabbixSender.Async/ISender.cs b/ZabbixSender.Async/ISender.cs
new file mode 100644
index 0000000..00421e6
--- /dev/null
+++ b/ZabbixSender.Async/ISender.cs
@@ -0,0 +1,19 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZabbixSender.Async
+{
+ public interface ISender
+ {
+ Task Send(IEnumerable data);
+
+ Task Send(IEnumerable data, CancellationToken cancellationToken);
+
+ Task Send(params SendData[] data);
+
+ Task Send(string host, string key, string value);
+
+ Task Send(string host, string key, string value, CancellationToken cancellationToken);
+ }
+}
diff --git a/ZabbixSender.Async/NetworkStreamExtensions.cs b/ZabbixSender.Async/NetworkStreamExtensions.cs
new file mode 100644
index 0000000..7ece942
--- /dev/null
+++ b/ZabbixSender.Async/NetworkStreamExtensions.cs
@@ -0,0 +1,23 @@
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZabbixSender.Async
+{
+ internal static class NetworkStreamExtensions
+ {
+ public static async Task WaitForDataAvailable(this NetworkStream stream, int millisecondsDelay,
+ CancellationToken cancellationToken)
+ {
+ const int millisecondsDelayStep = 50;
+
+ for (int retry = 0; !stream.DataAvailable; retry += millisecondsDelayStep)
+ {
+ await Task.Delay(millisecondsDelayStep, cancellationToken);
+
+ if (retry >= millisecondsDelay || millisecondsDelay == 0)
+ throw new TaskCanceledException();
+ }
+ }
+ }
+}
diff --git a/ZabbixSender.Async/ProtocolException.cs b/ZabbixSender.Async/ProtocolException.cs
new file mode 100644
index 0000000..1f57db5
--- /dev/null
+++ b/ZabbixSender.Async/ProtocolException.cs
@@ -0,0 +1,27 @@
+using System;
+
+namespace ZabbixSender.Async
+{
+ public sealed class ProtocolException : Exception
+ {
+ public ProtocolException(string message) :
+ base($"Protocol error - {message}.")
+ { }
+
+ public ProtocolException(string message, Exception innerException) :
+ base($"Protocol error - {message}.", innerException)
+ { }
+
+ public ProtocolException(string message, byte[] response) :
+ base($"Protocol error - {message}. See the whole response in Data property by Response key.")
+ {
+ Data.Add("Response", response);
+ }
+
+ public ProtocolException(string message, Exception innerException, byte[] response) :
+ base($"Protocol error - {message}. See the whole response in Data property by Response key.", innerException)
+ {
+ Data.Add("Response", response);
+ }
+ }
+}
diff --git a/ZabbixSender.Async/SendData.cs b/ZabbixSender.Async/SendData.cs
index 3de6e10..01fcdcd 100644
--- a/ZabbixSender.Async/SendData.cs
+++ b/ZabbixSender.Async/SendData.cs
@@ -1,5 +1,10 @@
-namespace ZabbixSender.Async
+using System;
+
+namespace ZabbixSender.Async
{
+ ///
+ /// Represents a data frame to be sent to Zabbix.
+ ///
public class SendData
{
///
@@ -9,7 +14,7 @@ public class SendData
///
/// A key of the item to send.
- /// Should belong to the specified host and have "Zabbix sender" type.
+ /// Should belong to the specified host and have "Zabbix sender" type.
///
public string Key { get; set; }
@@ -18,5 +23,10 @@ public class SendData
/// Should be formatted in a way to respect the configured "type of information" of the item.
///
public string Value { get; set; }
+
+ ///
+ /// A timestamp for the provided value. Leave null for ongoing values.
+ ///
+ public DateTime? Clock { get; set; }
}
-}
\ No newline at end of file
+}
diff --git a/ZabbixSender.Async/Sender.cs b/ZabbixSender.Async/Sender.cs
index c72998b..9ee86d8 100644
--- a/ZabbixSender.Async/Sender.cs
+++ b/ZabbixSender.Async/Sender.cs
@@ -1,35 +1,29 @@
-using Newtonsoft.Json;
-using Newtonsoft.Json.Serialization;
-using System;
-using System.Collections.Generic;
-using System.IO;
+using System;
using System.Net.Sockets;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ZabbixSender.Async
{
- public class Sender
+ ///
+ /// A class to send data to Zabbix. Be sure, that all used host names and items are configured.
+ /// All the items should have type "Zabbix trapper" to support Zabbix sender data flow.
+ ///
+ public class Sender : SenderSkeleton
{
- private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
-
- private static readonly byte[] ZabbixHeader = new byte[] { 0x5a, 0x42, 0x58, 0x44, 0x01 };
-
///
- /// Use this class to send data to Zabbix. Be sure, that all used host names and items are configured.
- /// All the items should have type "Zabbix trapper" to support Zabbix sender data flow.
+ /// Initializes a new instance of the ZabbixSender.Async.Sender class.
///
/// Host name or IP address of Zabbix server or proxy.
/// Zabbix server port.
- /// Send data request timeout in milliseconds.
- /// Stream writer buffer size.
+ /// Connect, send and receive timeout in milliseconds.
+ /// Buffer size for stream reading and writing.
public Sender(string zabbixServer, int port = 10051, int timeout = 500, int bufferSize = 1024)
+ : base(CreateTcpClient(zabbixServer, port, timeout, bufferSize),
+ () => new Formatter(bufferSize))
{
ZabbixServer = zabbixServer;
Port = port;
- Timeout = timeout;
- BufferSize = bufferSize;
}
///
@@ -42,94 +36,23 @@ public Sender(string zabbixServer, int port = 10051, int timeout = 500, int buff
///
public int Port { get; }
- ///
- /// Send data request timeout in milliseconds.
- ///
- public int Timeout { get; }
-
- ///
- /// Stream writer buffer size.
- ///
- public int BufferSize { get; }
-
- public Task Send(string host, string key, string value) =>
- Send(host, key, value, CancellationToken.None);
-
- public Task Send(string host, string key, string value, CancellationToken cancellationToken) =>
- Send(new[]
- {
- new SendData
- {
- Host = host,
- Key = key,
- Value = value
- }
- }, cancellationToken);
-
- public Task Send(params SendData[] data) =>
- Send(data, CancellationToken.None);
-
- public Task Send(IEnumerable data) =>
- Send(data, CancellationToken.None);
-
- public async Task Send(IEnumerable data, CancellationToken cancellationToken)
+ private static Func> CreateTcpClient(string zabbixServer, int port, int timeout, int bufferSize)
{
- using (var tcpClient = new TcpClient())
+ var tcpClient = new TcpClient
{
- await tcpClient.ConnectAsync(ZabbixServer, Port);
-
- using (var networkStream = tcpClient.GetStream())
- {
- var serializer = JsonSerializer.Create(new JsonSerializerSettings
- {
- ContractResolver = new CamelCasePropertyNamesContractResolver()
- });
-
- using (var ms = new MemoryStream())
- {
- using (var writer = new StreamWriter(ms, Encoding.ASCII, BufferSize, true))
- using (var jsonWriter = new JsonTextWriter(writer))
- serializer.Serialize(
- jsonWriter,
- new
- {
- request = "sender data",
- data,
- clock = GetCurrentUnixTime()
- });
-
- await networkStream.WriteAsync(ZabbixHeader, 0, 5, cancellationToken);
- await networkStream.WriteAsync(BitConverter.GetBytes(ms.Length), 0, 8, cancellationToken);
-
- ms.Seek(0, SeekOrigin.Begin);
- await ms.CopyToAsync(networkStream, BufferSize, cancellationToken);
- }
+ SendTimeout = timeout,
+ ReceiveTimeout = timeout,
+ SendBufferSize = bufferSize,
+ ReceiveBufferSize = bufferSize
+ };
- networkStream.Flush();
-
- for (int retry = 0; !networkStream.DataAvailable; retry += 50)
- {
- await Task.Delay(50, cancellationToken);
-
- if (retry >= Timeout)
- throw new TaskCanceledException();
- }
-
- var response = new byte[BufferSize];
- var count = await networkStream.ReadAsync(response, 0, response.Length, cancellationToken);
- var begin = Array.IndexOf(response, (byte)'{');
+ return async cancellationToken =>
+ {
+ await tcpClient.ConnectAsync(zabbixServer, port)
+ .WithTimeout(timeout, cancellationToken);
- using (var ms = new MemoryStream(response, begin, count - begin))
- {
- using (var reader = new StreamReader(ms, Encoding.ASCII))
- using (var jsonReader = new JsonTextReader(reader))
- return serializer.Deserialize(jsonReader);
- }
- }
- }
+ return tcpClient;
+ };
}
-
- private static long GetCurrentUnixTime() =>
- (long)(DateTime.UtcNow - UnixEpoch).TotalSeconds;
}
}
diff --git a/ZabbixSender.Async/SenderResponse.cs b/ZabbixSender.Async/SenderResponse.cs
index ac66d2d..0406c9b 100644
--- a/ZabbixSender.Async/SenderResponse.cs
+++ b/ZabbixSender.Async/SenderResponse.cs
@@ -1,9 +1,15 @@
namespace ZabbixSender.Async
{
+ ///
+ /// Represents a response from Zabbix on the data sent.
+ ///
public class SenderResponse
{
public string Response { get; set; }
public string Info { get; set; }
public bool IsSuccess => Response == "success";
+
+ public SenderResponseInfo ParseInfo() =>
+ new SenderResponseInfo(Info);
}
-}
\ No newline at end of file
+}
diff --git a/ZabbixSender.Async/SenderResponseInfo.cs b/ZabbixSender.Async/SenderResponseInfo.cs
new file mode 100644
index 0000000..b3468aa
--- /dev/null
+++ b/ZabbixSender.Async/SenderResponseInfo.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Globalization;
+using System.Text.RegularExpressions;
+
+namespace ZabbixSender.Async
+{
+ public class SenderResponseInfo
+ {
+ private readonly string info;
+
+ public SenderResponseInfo(string info)
+ {
+ this.info = info;
+
+ var match = Regex.Match(info, @"processed: (\d+); failed: (\d+); total: (\d+); seconds spent: (\d+\.\d+)");
+
+ if (!match.Success)
+ throw new ProtocolException($"Info field has an unexpected format: \"{info}\"");
+
+ try
+ {
+ Processed = Convert.ToInt32(match.Groups[1].Value, CultureInfo.InvariantCulture);
+ Failed = Convert.ToInt32(match.Groups[2].Value, CultureInfo.InvariantCulture);
+ Total = Convert.ToInt32(match.Groups[3].Value, CultureInfo.InvariantCulture);
+ SecondsSpent = Convert.ToDouble(match.Groups[4].Value, CultureInfo.InvariantCulture);
+ }
+ catch (FormatException ex)
+ {
+ throw new ProtocolException($"Info field has an unexpected format: \"{info}\"", ex);
+ }
+ }
+
+ public int Processed { get; }
+ public int Failed { get; }
+ public int Total { get; }
+ public double SecondsSpent { get; }
+
+ public override string ToString() => info;
+ }
+}
\ No newline at end of file
diff --git a/ZabbixSender.Async/SenderSkeleton.cs b/ZabbixSender.Async/SenderSkeleton.cs
new file mode 100644
index 0000000..b3c1afc
--- /dev/null
+++ b/ZabbixSender.Async/SenderSkeleton.cs
@@ -0,0 +1,65 @@
+using System;
+using System.Collections.Generic;
+using System.Net.Sockets;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZabbixSender.Async
+{
+ ///
+ /// A basic class for sending datato Zabbix, which allows custom setup for TcpClient and Formatter.
+ ///
+ public class SenderSkeleton : ISender
+ {
+ private readonly Func> tcpClientFactory;
+ private readonly Func formatterFactory;
+
+ ///
+ /// Initializes a new instance of the ZabbixSender.Async.SenderSkeleton class.
+ ///
+ /// An async factory delegate for TcpClient.
+ /// A factory delegate for the data formatter.
+ public SenderSkeleton(
+ Func> tcpClientFactory,
+ Func formatterFactory)
+ {
+ this.tcpClientFactory = tcpClientFactory;
+ this.formatterFactory = formatterFactory;
+ }
+
+ public Task Send(string host, string key, string value) =>
+ Send(host, key, value, CancellationToken.None);
+
+ public Task Send(string host, string key, string value, CancellationToken cancellationToken) =>
+ Send(new[]
+ {
+ new SendData
+ {
+ Host = host,
+ Key = key,
+ Value = value
+ }
+ }, cancellationToken);
+
+ public Task Send(params SendData[] data) =>
+ Send(data, CancellationToken.None);
+
+ public Task Send(IEnumerable data) =>
+ Send(data, CancellationToken.None);
+
+ public async Task Send(IEnumerable data, CancellationToken cancellationToken)
+ {
+ using (var tcpClient = await tcpClientFactory(cancellationToken))
+ using (var networkStream = tcpClient.GetStream())
+ {
+ var formatter = formatterFactory();
+
+ await formatter.WriteRequestAsync(networkStream, data, cancellationToken);
+ await networkStream.FlushAsync();
+ await networkStream.WaitForDataAvailable(tcpClient.ReceiveTimeout, cancellationToken);
+
+ return await formatter.ReadResponseAsync(networkStream, cancellationToken);
+ }
+ }
+ }
+}
diff --git a/ZabbixSender.Async/TaskExtensions.cs b/ZabbixSender.Async/TaskExtensions.cs
new file mode 100644
index 0000000..ed9535f
--- /dev/null
+++ b/ZabbixSender.Async/TaskExtensions.cs
@@ -0,0 +1,15 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ZabbixSender.Async
+{
+ internal static class TaskExtensions
+ {
+ public static async Task WithTimeout(this Task task, int millisecondsDelay,
+ CancellationToken cancellationToken)
+ {
+ if (await Task.WhenAny(task, Task.Delay(millisecondsDelay, cancellationToken)) != task)
+ throw new TaskCanceledException();
+ }
+ }
+}
diff --git a/ZabbixSender.Async/ZabbixSender.Async.csproj b/ZabbixSender.Async/ZabbixSender.Async.csproj
index 4311c95..c8a7d3c 100644
--- a/ZabbixSender.Async/ZabbixSender.Async.csproj
+++ b/ZabbixSender.Async/ZabbixSender.Async.csproj
@@ -2,6 +2,14 @@
netcoreapp2.1;net461
+ 1.0.1
+ true
+ Apache License 2.0 (stop-cran, 2018)
+ stop-cran <stop-cran@list.ru>
+ stop-cran <stop-cran@list.ru>
+ https://github.com/stop-cran/ZabbixSender.Async
+ Added clock. Added interfaces for a possibility of DI.
+ The package provides a tool to send data to Zabbix in the same way as zabbix_sender tool. It implements Zabbix Sender Protocol 4.0.