Skip to content

Commit

Permalink
Added clock. Added interfaces for a possibility of DI.
Browse files Browse the repository at this point in the history
  • Loading branch information
stop-cran committed Dec 3, 2018
1 parent 2117fce commit 82457f5
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 106 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Synopsis

This package provide a tool to send data to Zabbix in the same way as zabbix_sender tool. It implements [Zabbix Sender Protocol 4.0](https://www.zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0).
The package provides a tool to send data to Zabbix in the same way as [zabbix_sender](https://www.zabbix.com/documentation/4.0/ru/manual/concepts/sender) tool. It implements [Zabbix Sender Protocol 4.0](https://www.zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0).

# Installation

Expand Down
151 changes: 151 additions & 0 deletions ZabbixSender.Async/Formatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ZabbixSender.Async
{
/// <summary>
/// An auxiliary class for Zabbix sender protocol 4.0 request and response formatting.
/// See full specs at https://zabbix.org/wiki/Docs/protocols/zabbix_sender/4.0.
/// </summary>
public class Formatter : IFormatter
{
private static readonly byte[] ZabbixHeader = Encoding.ASCII.GetBytes("ZBXD\x01");

private readonly int bufferSize;

private readonly JsonSerializer serializer;

/// <summary>
/// Initializes a new instance of the ZabbixSender.Async.Formatter class with custom JsonSerializerSettings.
/// </summary>
/// <param name="bufferSize">Stream buffer size.</param>
public Formatter(int bufferSize = 1024) :
this(new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
NullValueHandling = NullValueHandling.Ignore,
Converters =
{
new UnixDateTimeConverter()
}
}, bufferSize)
{ }

/// <summary>
/// Initializes a new instance of the ZabbixSender.Async.Formatter class.
/// </summary>
/// <param name="settings">Custom Json serialization settings.</param>
/// <param name="bufferSize">Stream buffer size.</param>
public Formatter(JsonSerializerSettings settings, int bufferSize = 1024)
{
serializer = JsonSerializer.Create(settings);
this.bufferSize = bufferSize;
}

public void WriteRequest(Stream stream, IEnumerable<SendData> data)
{
using (var ms = new MemoryStream())
{
using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true))
using (var jsonWriter = new JsonTextWriter(writer))
serializer.Serialize(
jsonWriter,
new
{
Request = "sender data",
Data = data,
Clock = DateTime.Now
});

var lengthBytes = BitConverter.GetBytes(ms.Length);

stream.Write(ZabbixHeader, 0, ZabbixHeader.Length);
stream.Write(BitConverter.GetBytes(ms.Length), 0, lengthBytes.Length);

ms.Seek(0, SeekOrigin.Begin);
ms.CopyTo(stream, bufferSize);
}
}

public async Task WriteRequestAsync(Stream stream, IEnumerable<SendData> data,
CancellationToken cancellationToken)
{
using (var ms = new MemoryStream())
{
using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true))
using (var jsonWriter = new JsonTextWriter(writer))
serializer.Serialize(
jsonWriter,
new
{
Request = "sender data",
Data = data,
Clock = DateTime.Now
});

var lengthBytes = BitConverter.GetBytes(ms.Length);

await stream.WriteAsync(ZabbixHeader, 0, ZabbixHeader.Length,
cancellationToken);
await stream.WriteAsync(BitConverter.GetBytes(ms.Length), 0,
lengthBytes.Length, cancellationToken);

ms.Seek(0, SeekOrigin.Begin);
await ms.CopyToAsync(stream, bufferSize, cancellationToken);
}
}

public SenderResponse ReadResponse(Stream stream)
{
int responseSize = Math.Max(bufferSize, 128);
var response = new byte[responseSize];
var count = stream.Read(response, 0, response.Length);
var begin = Array.IndexOf(response, (byte)'{');

if (count <= 0)
throw new ProtocolException("empty reponse received");

if (begin == -1)
throw new ProtocolException("start of Json ({) not found", response);

if (count >= responseSize)
throw new ProtocolException("the response is too big", response);

try
{
using (var ms = new MemoryStream(response, begin, count - begin))
{
using (var reader = new StreamReader(ms, Encoding.ASCII))
using (var jsonReader = new JsonTextReader(reader))
return serializer.Deserialize<SenderResponse>(jsonReader);
}
}
catch (JsonException ex)
{
throw new ProtocolException("invalid response format", ex, response);
}
}

public async Task<SenderResponse> ReadResponseAsync(Stream stream,
CancellationToken cancellationToken)
{
var response = new byte[1024];
var count = await stream.ReadAsync(response, 0, response.Length, cancellationToken);
var begin = Array.IndexOf(response, (byte)'{');

using (var ms = new MemoryStream(response, begin, count - begin))
{
using (var reader = new StreamReader(ms, Encoding.ASCII))
using (var jsonReader = new JsonTextReader(reader))
return serializer.Deserialize<SenderResponse>(jsonReader);
}
}
}
}
15 changes: 15 additions & 0 deletions ZabbixSender.Async/IFormatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace ZabbixSender.Async
{
public interface IFormatter
{
SenderResponse ReadResponse(Stream stream);
Task<SenderResponse> ReadResponseAsync(Stream stream, CancellationToken cancellationToken);
void WriteRequest(Stream stream, IEnumerable<SendData> data);
Task WriteRequestAsync(Stream stream, IEnumerable<SendData> data, CancellationToken cancellationToken);
}
}
19 changes: 19 additions & 0 deletions ZabbixSender.Async/ISender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ZabbixSender.Async
{
public interface ISender
{
Task<SenderResponse> Send(IEnumerable<SendData> data);

Task<SenderResponse> Send(IEnumerable<SendData> data, CancellationToken cancellationToken);

Task<SenderResponse> Send(params SendData[] data);

Task<SenderResponse> Send(string host, string key, string value);

Task<SenderResponse> Send(string host, string key, string value, CancellationToken cancellationToken);
}
}
23 changes: 23 additions & 0 deletions ZabbixSender.Async/NetworkStreamExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace ZabbixSender.Async
{
internal static class NetworkStreamExtensions
{
public static async Task WaitForDataAvailable(this NetworkStream stream, int millisecondsDelay,
CancellationToken cancellationToken)
{
const int millisecondsDelayStep = 50;

for (int retry = 0; !stream.DataAvailable; retry += millisecondsDelayStep)
{
await Task.Delay(millisecondsDelayStep, cancellationToken);

if (retry >= millisecondsDelay || millisecondsDelay == 0)
throw new TaskCanceledException();
}
}
}
}
27 changes: 27 additions & 0 deletions ZabbixSender.Async/ProtocolException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace ZabbixSender.Async
{
public sealed class ProtocolException : Exception
{
public ProtocolException(string message) :
base($"Protocol error - {message}.")
{ }

public ProtocolException(string message, Exception innerException) :
base($"Protocol error - {message}.", innerException)
{ }

public ProtocolException(string message, byte[] response) :
base($"Protocol error - {message}. See the whole response in Data property by Response key.")
{
Data.Add("Response", response);
}

public ProtocolException(string message, Exception innerException, byte[] response) :
base($"Protocol error - {message}. See the whole response in Data property by Response key.", innerException)
{
Data.Add("Response", response);
}
}
}
16 changes: 13 additions & 3 deletions ZabbixSender.Async/SendData.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
namespace ZabbixSender.Async
using System;

namespace ZabbixSender.Async
{
/// <summary>
/// Represents a data frame to be sent to Zabbix.
/// </summary>
public class SendData
{
/// <summary>
Expand All @@ -9,7 +14,7 @@ public class SendData

/// <summary>
/// A key of the item to send.
/// Should belong to the specified host and have "Zabbix sender" type.
/// Should belong to the specified host and have "Zabbix sender" type.
/// </summary>
public string Key { get; set; }

Expand All @@ -18,5 +23,10 @@ public class SendData
/// Should be formatted in a way to respect the configured "type of information" of the item.
/// </summary>
public string Value { get; set; }

/// <summary>
/// A timestamp for the provided value. Leave null for ongoing values.
/// </summary>
public DateTime? Clock { get; set; }
}
}
}
Loading

0 comments on commit 82457f5

Please sign in to comment.