-
Notifications
You must be signed in to change notification settings - Fork 6
/
Formatter.cs
170 lines (149 loc) · 6.55 KB
/
Formatter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ZabbixSender.Async
{
/// <summary>
/// An auxiliary class for Zabbix sender protocol 4.4 request and response formatting.
/// See full specs at https://zabbix.org/wiki/Docs/protocols/zabbix_sender/4.4.
/// </summary>
public class Formatter : IFormatter
{
// See docs: https://www.zabbix.com/documentation/4.4/manual/appendix/protocols/header_datalen
private static readonly byte[] ZabbixHeader = Encoding.ASCII.GetBytes("ZBXD\x01");
private readonly int bufferSize;
private readonly JsonSerializer serializer;
/// <summary>
/// Initializes a new instance of the ZabbixSender.Async.Formatter class with custom JsonSerializerSettings.
/// </summary>
/// <param name="bufferSize">Stream buffer size.</param>
public Formatter(int bufferSize = 1024) :
this(new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
NullValueHandling = NullValueHandling.Ignore,
Converters =
{
new UnixDateTimeConverter()
}
}, bufferSize)
{ }
/// <summary>
/// Initializes a new instance of the ZabbixSender.Async.Formatter class.
/// </summary>
/// <param name="settings">Custom Json serialization settings.</param>
/// <param name="bufferSize">Stream buffer size.</param>
public Formatter(JsonSerializerSettings settings, int bufferSize = 1024)
{
serializer = JsonSerializer.Create(settings);
this.bufferSize = bufferSize;
}
/// <summary>
/// Write provided data items to the request stream.
/// </summary>
/// <param name="stream">Request stream.</param>
/// <param name="data">Data items to write.</param>
public void WriteRequest(Stream stream, IEnumerable<SendData> data)
{
using (var ms = new MemoryStream())
{
using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true))
using (var jsonWriter = new JsonTextWriter(writer))
serializer.Serialize(
jsonWriter,
new
{
Request = "sender data",
Data = data,
Clock = DateTime.Now
});
var lengthBytes = BitConverter.GetBytes(ms.Length);
stream.Write(ZabbixHeader, 0, ZabbixHeader.Length);
stream.Write(BitConverter.GetBytes(ms.Length), 0, lengthBytes.Length);
ms.Seek(0, SeekOrigin.Begin);
ms.CopyTo(stream, bufferSize);
}
}
/// <summary>
/// Write provided data items to the request stream.
/// </summary>
/// <param name="stream">A stream to write to.</param>
/// <param name="data">Data items to write.</param>
/// <param name="cancellationToken">A CancellationToken for the write operation.</param>
public async Task WriteRequestAsync(Stream stream, IEnumerable<SendData> data,
CancellationToken cancellationToken = default)
{
using (var ms = new MemoryStream())
{
using (var writer = new StreamWriter(ms, Encoding.ASCII, bufferSize, true))
using (var jsonWriter = new JsonTextWriter(writer))
serializer.Serialize(
jsonWriter,
new
{
Request = "sender data",
Data = data,
Clock = DateTime.Now
});
var lengthBytes = BitConverter.GetBytes(ms.Length);
await stream.WriteAsync(ZabbixHeader, 0, ZabbixHeader.Length,
cancellationToken);
await stream.WriteAsync(BitConverter.GetBytes(ms.Length), 0,
lengthBytes.Length, cancellationToken);
ms.Seek(0, SeekOrigin.Begin);
await ms.CopyToAsync(stream, bufferSize, cancellationToken);
}
}
/// <summary>
/// Read Zabbix server response from given stream.
/// </summary>
/// <param name="stream">A stream to read from.</param>
public SenderResponse ReadResponse(Stream stream)
{
try
{
var buffer = new byte[13];
stream.Read(buffer, 0, buffer.Length); // skip the length
if (ZabbixHeader.Zip(buffer, (x, y) => x != y).Any(b => b))
throw new ProtocolException("the response has an incorrect header");
using (var reader = new StreamReader(stream, Encoding.ASCII))
using (var jsonReader = new JsonTextReader(reader))
return serializer.Deserialize<SenderResponse>(jsonReader);
}
catch (JsonException ex)
{
throw new ProtocolException("invalid response format", ex);
}
}
/// <summary>
/// Read Zabbix server response from given stream.
/// </summary>
/// <param name="stream">A stream to read from.</param>
/// <param name="cancellationToken">CancellationToken for the read operation.</param>
/// <returns></returns>
public async Task<SenderResponse> ReadResponseAsync(Stream stream, CancellationToken cancellationToken = default)
{
try
{
var buffer = new byte[13];
await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken); // skip the length
if (ZabbixHeader.Zip(buffer, (x, y) => x != y).Any(b => b))
throw new ProtocolException("the response has an incorrect header");
using (var reader = new StreamReader(stream, Encoding.ASCII))
using (var jsonReader = new JsonTextReader(reader))
return serializer.Deserialize<SenderResponse>(jsonReader);
}
catch (JsonException ex)
{
throw new ProtocolException("invalid response format", ex);
}
}
}
}