Skip to content

Commit

Permalink
Implement statsdnet to statsdnet transport. Add extra metrics. Closes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
lukevenediger committed Dec 2, 2013
1 parent 58d5fb7 commit 7c6bdf5
Show file tree
Hide file tree
Showing 27 changed files with 643 additions and 101 deletions.
72 changes: 72 additions & 0 deletions statsd.net.shared/Blocks/TimedBufferBlock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using statsd.net.shared.Services;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace statsd.net.shared.Blocks
{
public class TimedBufferBlock<T> : ITargetBlock<T>
{
private ConcurrentBag<T> _buffer;
private Action<T[]> _flushAction;
private IIntervalService _intervalService;
private Task _completionTask;
private bool _isActive;

public TimedBufferBlock(TimeSpan flushPeriod,
Action<T[]> flushAction,
IIntervalService interval = null,
CancellationToken? cancellationToken = null)
{
_buffer = new ConcurrentBag<T>();

_completionTask = new Task(() =>
{
_isActive = false;
});
_flushAction = flushAction;
if (interval == null)
{
_intervalService = new IntervalService(flushPeriod, cancellationToken);
}
_intervalService.Elapsed += (sender, e) =>
{
Flush();
};
_intervalService.Start();
_isActive = true;
}

private void Flush()
{
var items = _buffer.Empty();
_flushAction(items);
}

public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
_buffer.Add(messageValue);
return DataflowMessageStatus.Accepted;
}

public void Complete()
{
_completionTask.RunSynchronously();
}

public Task Completion
{
get { return _completionTask; }
}

public void Fault(Exception exception)
{
throw new NotImplementedException();
}
}
}
45 changes: 44 additions & 1 deletion statsd.net.shared/ExtensionMethods.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using log4net;
using statsd.net.shared.Messages;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -86,4 +89,44 @@ public static int ToInt(this XElement element, string attributeName)
public static bool ToBoolean(this XElement element, string attributeName)
{
return Boolean.Parse(element.Attribute(attributeName).Value);
}}
}

public static byte[] Compress(this byte[] data)
{
using (var output = new MemoryStream())
{
using (var zip = new GZipStream(output, CompressionMode.Compress))
{
zip.Write(data, 0, data.Length);
}
return output.ToArray();
}
}

public static byte[] Decompress(this byte[] data)
{
using (var output = new MemoryStream())
{
using (var input = new MemoryStream(data))
{
using (var unzip = new GZipStream(input, CompressionMode.Decompress))
{
unzip.CopyTo(output);
}
}
return output.ToArray();
}
}

public static T[] Empty<T>(this ConcurrentBag<T> bag)
{
var numItems = bag.Count;
var items = new List<T>(numItems);
T item;
while (bag.TryTake(out item))
{
items.Add(item);
}
return items.ToArray();
}
}
154 changes: 154 additions & 0 deletions statsd.net.shared/Listeners/StatsdnetTcpListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
using statsd.net.shared.Messages;
using statsd.net.shared.Services;
using statsd.net.shared.Structures;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace statsd.net.shared.Listeners
{
public class StatsdnetTcpListener : IListener
{
private const int READ_TIMEOUT = 5000; /* 5 seconds */
private static string[] SPACE_SPLITTER = new String[] { " " };
private static string[] NEWLINE_SPLITTER = new String[] { Environment.NewLine };

private ITargetBlock<string> _target;
private CancellationToken _token;
private ISystemMetricsService _systemMetrics;
private TcpListener _tcpListener;
private int _activeConnections;
private ActionBlock<DecoderBlockPacket> _decoderBlock;

public bool IsListening { get; private set; }

public StatsdnetTcpListener(int port, ISystemMetricsService systemMetrics)
{
_systemMetrics = systemMetrics;
IsListening = false;
_activeConnections = 0;
_tcpListener = new TcpListener(IPAddress.Any, port);
_decoderBlock = new ActionBlock<DecoderBlockPacket>((data) => { DecodePacketAndForward(data); },
Utility.UnboundedExecution());
}

public async void LinkTo(ITargetBlock<string> target, CancellationToken token)
{
_target = target;
_token = token;
await Listen();
}

private async Task Listen()
{
_tcpListener.Start();
IsListening = true;
while(!_token.IsCancellationRequested)
{
var tcpClient = await _tcpListener.AcceptTcpClientAsync();
ProcessIncomingConnection(tcpClient);
}
}

private void ProcessIncomingConnection(TcpClient tcpClient)
{
try
{
Interlocked.Increment(ref _activeConnections);
_systemMetrics.LogGauge("listeners.statsdnet.activeConnections", _activeConnections);
_systemMetrics.LogCount("listeners.statsdnet.connection.open");
using (BinaryReader reader = new BinaryReader(tcpClient.GetStream()))
{
while (true)
{
if (reader.PeekChar() == 0)
{
// close the socket
return;
}
// Get the length
var packetLength = reader.ReadInt32();
// Is it compressed?
var isCompressed = reader.ReadBoolean();
// Now get the packet
var packet = reader.ReadBytes(packetLength);
// Decode
_decoderBlock.Post(new DecoderBlockPacket(packet, isCompressed));
}
}
}
catch (SocketException se)
{
// oops, we're done
_systemMetrics.LogCount("listeners.statsdnet.error.SocketException." + se.SocketErrorCode.ToString());
}
catch (IOException io)
{
// Not much we can do here.
_systemMetrics.LogCount("listeners.statsdnet.error.IOException");
}
catch (Exception ex)
{
var a = 1;
}
finally
{
try
{
tcpClient.Close();
}
catch
{
// Do nothing but log that this happened
_systemMetrics.LogCount("listeners.statsdnet.error.closeThrewException");
}

_systemMetrics.LogCount("listeners.statsdnet.connection.closed");
Interlocked.Decrement(ref _activeConnections);
_systemMetrics.LogGauge("listeners.statsdnet.activeConnections", _activeConnections);
}
}

private void DecodePacketAndForward(DecoderBlockPacket packet)
{
try
{
byte[] rawData;
if (packet.isCompressed)
{
_systemMetrics.LogCount("listeners.statsdnet.bytes.gzip", packet.data.Length);
rawData = packet.data.Decompress();
}
else
{
rawData = packet.data;
}

_systemMetrics.LogCount("listeners.statsdnet.bytes.raw", rawData.Length);
var lines = Encoding.UTF8.GetString(rawData).Split(
NEWLINE_SPLITTER,
StringSplitOptions.RemoveEmptyEntries
);
foreach(var line in lines)
{
// Format this as raw and send it on.
var parts = line.Split(SPACE_SPLITTER, StringSplitOptions.RemoveEmptyEntries);
_target.Post(parts[0] + ":" + parts[1] + "|r|" + parts[2]);
}
_systemMetrics.LogCount("listeners.statsdnet.lines", lines.Length);
}
catch (Exception ex)
{
_systemMetrics.LogCount("listeners.statsdnet.decodingError." + ex.GetType().Name);
}
}

}
}
4 changes: 2 additions & 2 deletions statsd.net.shared/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
[assembly: AssemblyVersion("1.3.1.0")]
[assembly: AssemblyFileVersion("1.3.1.0")]
2 changes: 2 additions & 0 deletions statsd.net.shared/Structures/Bucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public Bucket(BucketType bucketType,

public abstract void FeedTarget(ITargetBlock<GraphiteLine> target);

public abstract GraphiteLine[] ToLines();

public static Bucket Clone(Bucket bucket)
{
// Don't clone the bucket, just send back this reference since nobody
Expand Down
10 changes: 10 additions & 0 deletions statsd.net.shared/Structures/CounterBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ public CounterBucket(KeyValuePair<string, int>[] counts, long epoch, string root
{
}

public override GraphiteLine[] ToLines()
{
var lines = new List<GraphiteLine>();
foreach (var count in Items)
{
lines.Add(new GraphiteLine(RootNamespace + count.Key, count.Value, Epoch));
}
return lines.ToArray();
}

public override void FeedTarget(ITargetBlock<GraphiteLine> target)
{
foreach (var count in Items)
Expand Down
19 changes: 19 additions & 0 deletions statsd.net.shared/Structures/DecoderBlockPacket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace statsd.net.shared.Structures
{
public class DecoderBlockPacket
{
public byte[] data;
public bool isCompressed;

public DecoderBlockPacket(byte[] data, bool isCompressed)
{
this.data = data;
this.isCompressed = isCompressed;
}
}
}
10 changes: 10 additions & 0 deletions statsd.net.shared/Structures/GaugesBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ public GaugesBucket(KeyValuePair<string, int>[] gauges, long epoch, string rootN
Gauges = Gauges;
}

public override GraphiteLine[] ToLines()
{
var lines = new List<GraphiteLine>();
foreach (var gauge in Gauges)
{
lines.Add(new GraphiteLine(RootNamespace + gauge.Key, gauge.Value, Epoch));
}
return lines.ToArray();
}

public override void FeedTarget(ITargetBlock<GraphiteLine> target)
{
foreach (var gauge in Gauges)
Expand Down
10 changes: 10 additions & 0 deletions statsd.net.shared/Structures/LatencyBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public LatencyBucket(KeyValuePair<string, LatencyDatapointBox>[] latencies,
_calculateSumSquares = calculateSumSquares;
}

public override GraphiteLine[] ToLines()
{
var lines = new List<GraphiteLine>();
foreach (var latency in Latencies)
{
lines.AddRange(MakeGraphiteLines(latency));
}
return lines.ToArray();
}

public override void FeedTarget(ITargetBlock<GraphiteLine> target)
{
foreach (var latency in Latencies)
Expand Down
19 changes: 19 additions & 0 deletions statsd.net.shared/Structures/PercentileBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ public PercentileBucket(KeyValuePair<string, DatapointBox>[] timings,
Percentile = percentile;
}

public override GraphiteLine[] ToLines()
{
var lines = new List<GraphiteLine>();
int percentileValue;
foreach (var measurements in Timings)
{
if (TryComputePercentile(measurements, out percentileValue))
{
lines.Add(
new GraphiteLine(
RootNamespace + measurements.Key + PercentileName,
percentileValue,
Epoch)
);
}
}
return lines.ToArray();
}

public override void FeedTarget(ITargetBlock<GraphiteLine> target)
{
int percentileValue;
Expand Down
Loading

0 comments on commit 7c6bdf5

Please sign in to comment.