diff --git a/statsd.net.shared/Blocks/TimedBufferBlock.cs b/statsd.net.shared/Blocks/TimedBufferBlock.cs new file mode 100644 index 0000000..333200e --- /dev/null +++ b/statsd.net.shared/Blocks/TimedBufferBlock.cs @@ -0,0 +1,72 @@ +using statsd.net.shared.Services; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace statsd.net.shared.Blocks +{ + public class TimedBufferBlock : ITargetBlock + { + private ConcurrentBag _buffer; + private Action _flushAction; + private IIntervalService _intervalService; + private Task _completionTask; + private bool _isActive; + + public TimedBufferBlock(TimeSpan flushPeriod, + Action flushAction, + IIntervalService interval = null, + CancellationToken? cancellationToken = null) + { + _buffer = new ConcurrentBag(); + + _completionTask = new Task(() => + { + _isActive = false; + }); + _flushAction = flushAction; + if (interval == null) + { + _intervalService = new IntervalService(flushPeriod, cancellationToken); + } + _intervalService.Elapsed += (sender, e) => + { + Flush(); + }; + _intervalService.Start(); + _isActive = true; + } + + private void Flush() + { + var items = _buffer.Empty(); + _flushAction(items); + } + + public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock source, bool consumeToAccept) + { + _buffer.Add(messageValue); + return DataflowMessageStatus.Accepted; + } + + public void Complete() + { + _completionTask.RunSynchronously(); + } + + public Task Completion + { + get { return _completionTask; } + } + + public void Fault(Exception exception) + { + throw new NotImplementedException(); + } + } +} diff --git a/statsd.net.shared/ExtensionMethods.cs b/statsd.net.shared/ExtensionMethods.cs index ab26c97..3b12add 100644 --- a/statsd.net.shared/ExtensionMethods.cs +++ b/statsd.net.shared/ExtensionMethods.cs @@ -1,7 +1,10 @@ using log4net; using statsd.net.shared.Messages; using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; +using System.IO.Compression; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -86,4 +89,44 @@ public static int ToInt(this XElement element, string attributeName) public static bool ToBoolean(this XElement element, string attributeName) { return Boolean.Parse(element.Attribute(attributeName).Value); - }} + } + + public static byte[] Compress(this byte[] data) + { + using (var output = new MemoryStream()) + { + using (var zip = new GZipStream(output, CompressionMode.Compress)) + { + zip.Write(data, 0, data.Length); + } + return output.ToArray(); + } + } + + public static byte[] Decompress(this byte[] data) + { + using (var output = new MemoryStream()) + { + using (var input = new MemoryStream(data)) + { + using (var unzip = new GZipStream(input, CompressionMode.Decompress)) + { + unzip.CopyTo(output); + } + } + return output.ToArray(); + } + } + + public static T[] Empty(this ConcurrentBag bag) + { + var numItems = bag.Count; + var items = new List(numItems); + T item; + while (bag.TryTake(out item)) + { + items.Add(item); + } + return items.ToArray(); + } +} \ No newline at end of file diff --git a/statsd.net.shared/Listeners/StatsdnetTcpListener.cs b/statsd.net.shared/Listeners/StatsdnetTcpListener.cs new file mode 100644 index 0000000..776dda5 --- /dev/null +++ b/statsd.net.shared/Listeners/StatsdnetTcpListener.cs @@ -0,0 +1,154 @@ +using statsd.net.shared.Messages; +using statsd.net.shared.Services; +using statsd.net.shared.Structures; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace statsd.net.shared.Listeners +{ + public class StatsdnetTcpListener : IListener + { + private const int READ_TIMEOUT = 5000; /* 5 seconds */ + private static string[] SPACE_SPLITTER = new String[] { " " }; + private static string[] NEWLINE_SPLITTER = new String[] { Environment.NewLine }; + + private ITargetBlock _target; + private CancellationToken _token; + private ISystemMetricsService _systemMetrics; + private TcpListener _tcpListener; + private int _activeConnections; + private ActionBlock _decoderBlock; + + public bool IsListening { get; private set; } + + public StatsdnetTcpListener(int port, ISystemMetricsService systemMetrics) + { + _systemMetrics = systemMetrics; + IsListening = false; + _activeConnections = 0; + _tcpListener = new TcpListener(IPAddress.Any, port); + _decoderBlock = new ActionBlock((data) => { DecodePacketAndForward(data); }, + Utility.UnboundedExecution()); + } + + public async void LinkTo(ITargetBlock target, CancellationToken token) + { + _target = target; + _token = token; + await Listen(); + } + + private async Task Listen() + { + _tcpListener.Start(); + IsListening = true; + while(!_token.IsCancellationRequested) + { + var tcpClient = await _tcpListener.AcceptTcpClientAsync(); + ProcessIncomingConnection(tcpClient); + } + } + + private void ProcessIncomingConnection(TcpClient tcpClient) + { + try + { + Interlocked.Increment(ref _activeConnections); + _systemMetrics.LogGauge("listeners.statsdnet.activeConnections", _activeConnections); + _systemMetrics.LogCount("listeners.statsdnet.connection.open"); + using (BinaryReader reader = new BinaryReader(tcpClient.GetStream())) + { + while (true) + { + if (reader.PeekChar() == 0) + { + // close the socket + return; + } + // Get the length + var packetLength = reader.ReadInt32(); + // Is it compressed? + var isCompressed = reader.ReadBoolean(); + // Now get the packet + var packet = reader.ReadBytes(packetLength); + // Decode + _decoderBlock.Post(new DecoderBlockPacket(packet, isCompressed)); + } + } + } + catch (SocketException se) + { + // oops, we're done + _systemMetrics.LogCount("listeners.statsdnet.error.SocketException." + se.SocketErrorCode.ToString()); + } + catch (IOException io) + { + // Not much we can do here. + _systemMetrics.LogCount("listeners.statsdnet.error.IOException"); + } + catch (Exception ex) + { + var a = 1; + } + finally + { + try + { + tcpClient.Close(); + } + catch + { + // Do nothing but log that this happened + _systemMetrics.LogCount("listeners.statsdnet.error.closeThrewException"); + } + + _systemMetrics.LogCount("listeners.statsdnet.connection.closed"); + Interlocked.Decrement(ref _activeConnections); + _systemMetrics.LogGauge("listeners.statsdnet.activeConnections", _activeConnections); + } + } + + private void DecodePacketAndForward(DecoderBlockPacket packet) + { + try + { + byte[] rawData; + if (packet.isCompressed) + { + _systemMetrics.LogCount("listeners.statsdnet.bytes.gzip", packet.data.Length); + rawData = packet.data.Decompress(); + } + else + { + rawData = packet.data; + } + + _systemMetrics.LogCount("listeners.statsdnet.bytes.raw", rawData.Length); + var lines = Encoding.UTF8.GetString(rawData).Split( + NEWLINE_SPLITTER, + StringSplitOptions.RemoveEmptyEntries + ); + foreach(var line in lines) + { + // Format this as raw and send it on. + var parts = line.Split(SPACE_SPLITTER, StringSplitOptions.RemoveEmptyEntries); + _target.Post(parts[0] + ":" + parts[1] + "|r|" + parts[2]); + } + _systemMetrics.LogCount("listeners.statsdnet.lines", lines.Length); + } + catch (Exception ex) + { + _systemMetrics.LogCount("listeners.statsdnet.decodingError." + ex.GetType().Name); + } + } + + } +} diff --git a/statsd.net.shared/Properties/AssemblyInfo.cs b/statsd.net.shared/Properties/AssemblyInfo.cs index 3504a56..2e82cf7 100644 --- a/statsd.net.shared/Properties/AssemblyInfo.cs +++ b/statsd.net.shared/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] +[assembly: AssemblyVersion("1.3.1.0")] +[assembly: AssemblyFileVersion("1.3.1.0")] diff --git a/statsd.net.shared/Structures/Bucket.cs b/statsd.net.shared/Structures/Bucket.cs index 90cb4b5..a89df11 100644 --- a/statsd.net.shared/Structures/Bucket.cs +++ b/statsd.net.shared/Structures/Bucket.cs @@ -25,6 +25,8 @@ public Bucket(BucketType bucketType, public abstract void FeedTarget(ITargetBlock target); + public abstract GraphiteLine[] ToLines(); + public static Bucket Clone(Bucket bucket) { // Don't clone the bucket, just send back this reference since nobody diff --git a/statsd.net.shared/Structures/CounterBucket.cs b/statsd.net.shared/Structures/CounterBucket.cs index 6bde9e7..34fcdb5 100644 --- a/statsd.net.shared/Structures/CounterBucket.cs +++ b/statsd.net.shared/Structures/CounterBucket.cs @@ -15,6 +15,16 @@ public CounterBucket(KeyValuePair[] counts, long epoch, string root { } + public override GraphiteLine[] ToLines() + { + var lines = new List(); + foreach (var count in Items) + { + lines.Add(new GraphiteLine(RootNamespace + count.Key, count.Value, Epoch)); + } + return lines.ToArray(); + } + public override void FeedTarget(ITargetBlock target) { foreach (var count in Items) diff --git a/statsd.net.shared/Structures/DecoderBlockPacket.cs b/statsd.net.shared/Structures/DecoderBlockPacket.cs new file mode 100644 index 0000000..475a17a --- /dev/null +++ b/statsd.net.shared/Structures/DecoderBlockPacket.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace statsd.net.shared.Structures +{ + public class DecoderBlockPacket + { + public byte[] data; + public bool isCompressed; + + public DecoderBlockPacket(byte[] data, bool isCompressed) + { + this.data = data; + this.isCompressed = isCompressed; + } + } +} diff --git a/statsd.net.shared/Structures/GaugesBucket.cs b/statsd.net.shared/Structures/GaugesBucket.cs index 7b121ce..3d531d2 100644 --- a/statsd.net.shared/Structures/GaugesBucket.cs +++ b/statsd.net.shared/Structures/GaugesBucket.cs @@ -18,6 +18,16 @@ public GaugesBucket(KeyValuePair[] gauges, long epoch, string rootN Gauges = Gauges; } + public override GraphiteLine[] ToLines() + { + var lines = new List(); + foreach (var gauge in Gauges) + { + lines.Add(new GraphiteLine(RootNamespace + gauge.Key, gauge.Value, Epoch)); + } + return lines.ToArray(); + } + public override void FeedTarget(ITargetBlock target) { foreach (var gauge in Gauges) diff --git a/statsd.net.shared/Structures/LatencyBucket.cs b/statsd.net.shared/Structures/LatencyBucket.cs index 7a1a69e..4d36e6c 100644 --- a/statsd.net.shared/Structures/LatencyBucket.cs +++ b/statsd.net.shared/Structures/LatencyBucket.cs @@ -23,6 +23,16 @@ public LatencyBucket(KeyValuePair[] latencies, _calculateSumSquares = calculateSumSquares; } + public override GraphiteLine[] ToLines() + { + var lines = new List(); + foreach (var latency in Latencies) + { + lines.AddRange(MakeGraphiteLines(latency)); + } + return lines.ToArray(); + } + public override void FeedTarget(ITargetBlock target) { foreach (var latency in Latencies) diff --git a/statsd.net.shared/Structures/PercentileBucket.cs b/statsd.net.shared/Structures/PercentileBucket.cs index 98da051..0fc54f1 100644 --- a/statsd.net.shared/Structures/PercentileBucket.cs +++ b/statsd.net.shared/Structures/PercentileBucket.cs @@ -27,6 +27,25 @@ public PercentileBucket(KeyValuePair[] timings, Percentile = percentile; } + public override GraphiteLine[] ToLines() + { + var lines = new List(); + int percentileValue; + foreach (var measurements in Timings) + { + if (TryComputePercentile(measurements, out percentileValue)) + { + lines.Add( + new GraphiteLine( + RootNamespace + measurements.Key + PercentileName, + percentileValue, + Epoch) + ); + } + } + return lines.ToArray(); + } + public override void FeedTarget(ITargetBlock target) { int percentileValue; diff --git a/statsd.net.shared/Structures/RawBucket.cs b/statsd.net.shared/Structures/RawBucket.cs index 6437d10..4273453 100644 --- a/statsd.net.shared/Structures/RawBucket.cs +++ b/statsd.net.shared/Structures/RawBucket.cs @@ -18,15 +18,33 @@ public RawBucket(Raw[] rawLines, long epoch) _rawLines = rawLines; } - public override void FeedTarget(ITargetBlock target) + public override GraphiteLine[] ToLines() { + var lines = new List(); foreach (var line in _rawLines) { for (int index = 0; index < _rawLines.Length; index++) { - target.Post(new GraphiteLine(line.Name, line.Value, line.Timestamp ?? Epoch)); + lines.Add(new GraphiteLine(line.Name, line.Value, line.Timestamp ?? Epoch)); } } + return lines.ToArray(); + } + + public override void FeedTarget(ITargetBlock target) + { + foreach (var line in _rawLines) + { + target.Post(new GraphiteLine(line.Name, line.Value, line.Timestamp ?? Epoch)); + } + } + + public override string ToString() + { + var lines = _rawLines.Select(line => + new GraphiteLine(line.Name, line.Value, line.Timestamp ?? Epoch).ToString()) + .ToArray(); + return String.Join(Environment.NewLine, lines); } } } diff --git a/statsd.net.shared/Structures/SetsBucket.cs b/statsd.net.shared/Structures/SetsBucket.cs index 6bd39a6..8bb8660 100644 --- a/statsd.net.shared/Structures/SetsBucket.cs +++ b/statsd.net.shared/Structures/SetsBucket.cs @@ -20,6 +20,19 @@ public SetsBucket(List>>> sets Sets = sets; } + public override GraphiteLine[] ToLines() + { + var lines = new List(); + foreach (var set in Sets) + { + foreach (var item in set.Value) + { + lines.Add(new GraphiteLine(RootNamespace + set.Key, 1, Epoch)); + } + } + return lines.ToArray(); + } + public override void FeedTarget(ITargetBlock target) { foreach (var set in Sets) diff --git a/statsd.net.shared/statsd.net.shared.csproj b/statsd.net.shared/statsd.net.shared.csproj index 8beb36d..78692d7 100644 --- a/statsd.net.shared/statsd.net.shared.csproj +++ b/statsd.net.shared/statsd.net.shared.csproj @@ -55,12 +55,15 @@ + + + diff --git a/statsd.net/Backends/Statsdnet/StatsdnetBackend.cs b/statsd.net/Backends/Statsdnet/StatsdnetBackend.cs new file mode 100644 index 0000000..730e8b7 --- /dev/null +++ b/statsd.net/Backends/Statsdnet/StatsdnetBackend.cs @@ -0,0 +1,100 @@ +using log4net; +using log4net.Core; +using statsd.net.shared; +using statsd.net.shared.Backends; +using statsd.net.shared.Blocks; +using statsd.net.shared.Messages; +using statsd.net.shared.Services; +using statsd.net.shared.Structures; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace statsd.net.Backends.Statsdnet +{ + /// + /// Forwards all metrics on to another statsd.net instance over TCP. + /// + public class StatsdnetBackend : IBackend + { + private Task _completionTask; + private bool _isActive; + private TimedBufferBlock _bufferBlock; + private ISystemMetricsService _systemMetrics; + private StatsdnetForwardingClient _client; + + public StatsdnetBackend(string host, int port, TimeSpan flushPeriod, ISystemMetricsService systemMetrics) + { + _systemMetrics = systemMetrics; + var log = SuperCheapIOC.Resolve(); + _client = new StatsdnetForwardingClient(host, port, systemMetrics); + _bufferBlock = new TimedBufferBlock(flushPeriod, PostMetrics); + + _completionTask = new Task(() => + { + _isActive = false; + }); + + _isActive = true; + } + + private void PostMetrics(GraphiteLine[][] lineArrays) + { + var lines = new List(); + foreach(var graphiteLineArray in lineArrays) + { + foreach (var line in graphiteLineArray) + { + lines.Add(line); + } + } + var rawText = String.Join(Environment.NewLine, + lines.Select(line => line.ToString()).ToArray()); + var bytes = Encoding.UTF8.GetBytes(rawText); + if (_client.Send(bytes)) + { + _systemMetrics.LogCount("backends.statsdnet.lines", lines.Count); + _systemMetrics.LogGauge("backends.statsdnet.bytes", bytes.Length); + } + } + + public bool IsActive + { + get { return _isActive; } + } + + public int OutputCount + { + get { return 0; } + } + + public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, + Bucket messageValue, + ISourceBlock source, + bool consumeToAccept) + { + var lines = messageValue.ToLines(); + _bufferBlock.Post(lines); + + return DataflowMessageStatus.Accepted; + } + + public void Complete() + { + _completionTask.Start(); + } + + public Task Completion + { + get { return _completionTask; } + } + + public void Fault(Exception exception) + { + throw new NotImplementedException(); + } + } +} diff --git a/statsd.net/Backends/Statsdnet/StatsdnetForwardingClient.cs b/statsd.net/Backends/Statsdnet/StatsdnetForwardingClient.cs new file mode 100644 index 0000000..33f1534 --- /dev/null +++ b/statsd.net/Backends/Statsdnet/StatsdnetForwardingClient.cs @@ -0,0 +1,112 @@ +using statsd.net.shared.Services; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace statsd.net.Backends.Statsdnet +{ + public class StatsdnetForwardingClient + { + // Lots of opinions on what the correct value is: http://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-gzip-performance-benefits + private const int COMPRESSION_SIZE_THRESHOLD = 350; + private TcpClient _client; + private BinaryWriter _writer; + private string _host; + private int _port; + private int _numRetries; + private ISystemMetricsService _systemMetrics; + + public StatsdnetForwardingClient(string host, int port, ISystemMetricsService systemMetrics, int numRetries = 3) + { + _host = host; + _port = port; + _systemMetrics = systemMetrics; + _numRetries = numRetries; + _client = new TcpClient(); + } + + public bool Send (byte[] data) + { + return Send(data, _numRetries); + } + + private bool Send (byte[] data, int retryAttemptsLeft) + { + /** Statsd.net packet format consists of + * byte 0: 32-bit integer length + * byte 33: boolean for whether the data is compressed or not + * byte 34-: the packet + */ + + if (data.Length == 0) + { + return true; + } + + Func handleRetry = () => + { + _systemMetrics.LogCount("backends.statsdnet.sendFailed"); + if (retryAttemptsLeft > 0) + { + _systemMetrics.LogCount("backends.statsdnet.retrySend"); + return Send(data, --retryAttemptsLeft); + } + else + { + _systemMetrics.LogCount("backends.statsdnet.retrySendFailed"); + return false; + } + }; + + try + { + if (!_client.Connected) + { + try + { + _client.Close(); + } + catch (Exception) + { + // eat it + } + _client = new TcpClient(); + _client.Connect(_host, _port); + _writer = new BinaryWriter(_client.GetStream()); + } + if (data.Length < COMPRESSION_SIZE_THRESHOLD) + { + _writer.Write(data.Length); + _writer.Write(false); + _writer.Write(data); + _systemMetrics.LogCount("backends.statsdnet.bytes.raw", data.Length); + _systemMetrics.LogCount("backends.statsdnet.bytes.compressed", data.Length); + } + else + { + var compressedData = data.Compress(); + _writer.Write(compressedData.Length); + _writer.Write(true); + _writer.Write(compressedData); + _systemMetrics.LogCount("backends.statsdnet.bytes.raw", data.Length); + _systemMetrics.LogCount("backends.statsdnet.bytes.compressed", compressedData.Length); + } + return true; + } + catch (SocketException se) + { + _systemMetrics.LogCount("backends.statsdnet.error.SocketException." + se.SocketErrorCode.ToString()); + return handleRetry(); + } + catch (IOException) + { + _systemMetrics.LogCount("backends.statsdnet.error.IOException"); + return handleRetry(); + } + } + } +} diff --git a/statsd.net/Backends/StatsdnetBackend.cs b/statsd.net/Backends/StatsdnetBackend.cs deleted file mode 100644 index 4cd2e0c..0000000 --- a/statsd.net/Backends/StatsdnetBackend.cs +++ /dev/null @@ -1,75 +0,0 @@ -using statsd.net.shared; -using statsd.net.shared.Backends; -using statsd.net.shared.Messages; -using statsd.net.shared.Services; -using statsd.net.shared.Structures; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; - -namespace statsd.net.Backends -{ - /// - /// Forwards all metrics on to another statsd.net instance over TCP. - /// - public class StatsdnetBackend : IBackend - { - private Task _completionTask; - private bool _isActive; - private ActionBlock _statsdOutputBlock; - private ISystemMetricsService _systemMetrics; - - public StatsdnetBackend(string host, int port, ISystemMetricsService systemMetrics) - { - _systemMetrics = systemMetrics; - _completionTask = new Task(() => { _isActive = false; }); - StatsdClient.Statsd client = new StatsdClient.Statsd( host, port, StatsdClient.ConnectionType.Tcp ); - - _statsdOutputBlock = new ActionBlock(line => - { - client.LogRaw(line.Name, line.Quantity, line.Epoc); - _systemMetrics.LogCount("backends.statsdnet.lines"); - }, - Utility.OneAtATimeExecution()); - - _isActive = true; - } - - public bool IsActive - { - get { return _isActive; } - } - - public int OutputCount - { - get { return 0; } - } - - public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, - Bucket messageValue, - ISourceBlock source, - bool consumeToAccept) - { - messageValue.FeedTarget(_statsdOutputBlock); - return DataflowMessageStatus.Accepted; - } - - public void Complete() - { - _completionTask.Start(); - } - - public Task Completion - { - get { return _completionTask; } - } - - public void Fault(Exception exception) - { - throw new NotImplementedException(); - } - } -} diff --git a/statsd.net/Configuration/BackendConfiguration.cs b/statsd.net/Configuration/BackendConfiguration.cs index 4ba3c1a..6f8ab9d 100644 --- a/statsd.net/Configuration/BackendConfiguration.cs +++ b/statsd.net/Configuration/BackendConfiguration.cs @@ -63,11 +63,13 @@ public class StatsdBackendConfiguration : BackendConfiguration { public string Host { get; set; } public int Port { get; set; } + public TimeSpan FlushInterval { get; set; } - public StatsdBackendConfiguration(string host, int port) + public StatsdBackendConfiguration(string host, int port, TimeSpan? flushInterval) { Host = host; Port = port; + FlushInterval = flushInterval ?? new TimeSpan(0, 0, 5); } } diff --git a/statsd.net/Configuration/ConfigurationFactory.cs b/statsd.net/Configuration/ConfigurationFactory.cs index c84f5b6..cae4346 100644 --- a/statsd.net/Configuration/ConfigurationFactory.cs +++ b/statsd.net/Configuration/ConfigurationFactory.cs @@ -40,6 +40,9 @@ public static StatsdnetConfiguration Parse(string configFile) ( ( HTTPListenerConfiguration )listener ).HeaderKey = item.Attribute( "headerKey" ).Value; } break; + case "statsdnet": + listener = new StatsdnetListenerConfiguration(item.ToInt("port")); + break; default: throw new ArgumentOutOfRangeException("Not sure what this listener is: " + item.Name); } @@ -72,8 +75,10 @@ public static StatsdnetConfiguration Parse(string configFile) countersAsGauges: item.ToBoolean("countersAsGauges") ); break; - case "statsd": - backend = new StatsdBackendConfiguration(item.Attribute("host").Value, item.ToInt("port")); + case "statsdnet": + backend = new StatsdBackendConfiguration(item.Attribute("host").Value, + item.ToInt("port"), + ConvertToTimespan(item.Attribute("flushInterval").Value)); break; } config.Backends.Add(backend); diff --git a/statsd.net/Configuration/ListenerConfiguration.cs b/statsd.net/Configuration/ListenerConfiguration.cs index 5dc1ac1..a1b860f 100644 --- a/statsd.net/Configuration/ListenerConfiguration.cs +++ b/statsd.net/Configuration/ListenerConfiguration.cs @@ -38,4 +38,13 @@ public HTTPListenerConfiguration(int port, string headerKey = null) HeaderKey = headerKey; } } + + public class StatsdnetListenerConfiguration : ListenerConfiguration + { + public int Port { get; set; } + public StatsdnetListenerConfiguration(int port) + { + Port = port; + } + } } diff --git a/statsd.net/Framework/TimedGaugeAggregatorBlockFactory.cs b/statsd.net/Framework/TimedGaugeAggregatorBlockFactory.cs index 88fe0cb..e7a3a66 100644 --- a/statsd.net/Framework/TimedGaugeAggregatorBlockFactory.cs +++ b/statsd.net/Framework/TimedGaugeAggregatorBlockFactory.cs @@ -54,7 +54,10 @@ public static ActionBlock CreateBlock(ITargetBlock target zeroGauges += 1; } } - log.InfoFormat("Removed {0} empty gauges.", zeroGauges); + if (zeroGauges > 0) + { + log.InfoFormat("Removed {0} empty gauges.", zeroGauges); + } } gauges.Clear(); }; diff --git a/statsd.net/Properties/AssemblyInfo.cs b/statsd.net/Properties/AssemblyInfo.cs index e354a7e..dd23717 100644 --- a/statsd.net/Properties/AssemblyInfo.cs +++ b/statsd.net/Properties/AssemblyInfo.cs @@ -33,5 +33,5 @@ // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] // Uses SEMVER: http://semver.org/ -[assembly: AssemblyVersion("1.2.0.0")] -[assembly: AssemblyFileVersion("1.2.0.0")] +[assembly: AssemblyVersion("1.3.1.0")] +[assembly: AssemblyFileVersion("1.3.1.0")] diff --git a/statsd.net/Statsd.cs b/statsd.net/Statsd.cs index ced380d..ce6fade 100644 --- a/statsd.net/Statsd.cs +++ b/statsd.net/Statsd.cs @@ -19,6 +19,7 @@ using statsd.net.shared.Structures; using statsd.net.Backends.Librato; using statsd.net.Configuration; +using statsd.net.Backends.Statsdnet; namespace statsd.net { @@ -226,7 +227,7 @@ private void LoadBackends(StatsdnetConfiguration config, ISystemMetricsService s { var statsdConfig = backendConfig as StatsdBackendConfiguration; AddBackend( - new StatsdnetBackend(statsdConfig.Host, statsdConfig.Port, systemMetrics), + new StatsdnetBackend(statsdConfig.Host, statsdConfig.Port, statsdConfig.FlushInterval, systemMetrics), systemMetrics, "statsd" ); @@ -257,6 +258,12 @@ private void LoadListeners(StatsdnetConfiguration config, ISystemMetricsService AddListener(new HttpStatsListener(httpConfig.Port, systemMetrics)); systemMetrics.LogCount("startup.listener.http." + httpConfig.Port); } + else if (listenerConfig is StatsdnetListenerConfiguration) + { + var statsdnetConfig = listenerConfig as StatsdnetListenerConfiguration; + AddListener(new StatsdnetTcpListener(statsdnetConfig.Port, systemMetrics)); + systemMetrics.LogCount("startup.listener.statsdnet." + statsdnetConfig.Port); + } } } diff --git a/statsd.net/statsd.net.csproj b/statsd.net/statsd.net.csproj index 1192951..1b1944c 100644 --- a/statsd.net/statsd.net.csproj +++ b/statsd.net/statsd.net.csproj @@ -75,7 +75,8 @@ - + + @@ -103,7 +104,9 @@ - + + Designer + diff --git a/statsd.net/statsdnet.config b/statsd.net/statsdnet.config index bf93746..608da91 100644 --- a/statsd.net/statsdnet.config +++ b/statsd.net/statsdnet.config @@ -1,13 +1,14 @@  - + - + + - - - - - + + + + + @@ -15,6 +16,7 @@ + \ No newline at end of file diff --git a/statsd.net/statsdnet.config.everything b/statsd.net/statsdnet.config.everything index eff0765..7d60b8e 100644 --- a/statsd.net/statsdnet.config.everything +++ b/statsd.net/statsdnet.config.everything @@ -1,9 +1,10 @@  - + + @@ -27,6 +28,6 @@ - + \ No newline at end of file diff --git a/statsd.net/statsdnet.config.librato b/statsd.net/statsdnet.config.librato index ed23a76..528e09b 100644 --- a/statsd.net/statsdnet.config.librato +++ b/statsd.net/statsdnet.config.librato @@ -1,5 +1,5 @@  - + diff --git a/statsd.net/statsdnet.config.relay b/statsd.net/statsdnet.config.relay index 763c3f8..0848bee 100644 --- a/statsd.net/statsdnet.config.relay +++ b/statsd.net/statsdnet.config.relay @@ -1,5 +1,5 @@  - + @@ -15,7 +15,7 @@ - + \ No newline at end of file