diff --git a/pom.xml b/pom.xml
index 285be49..d734c49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,11 @@
+
+ com.lmax
+ disruptor
+ 3.2.0
+
org.hamcrest
hamcrest-core
diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
index 88729c8..e4d280a 100644
--- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
+++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
@@ -5,13 +5,22 @@
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.text.NumberFormat;
+import java.util.Arrays;
import java.util.Locale;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslatorOneArg;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.FatalExceptionHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.util.Util;
/**
* A simple StatsD client implementation facilitating metrics recording.
@@ -38,11 +47,16 @@
* on any StatsD clients.
*
* @author Tom Denley
+ * @author mick semb wever
*
*/
public final class NonBlockingStatsDClient implements StatsDClient {
private static final int PACKET_SIZE_BYTES = 1500;
+ private static final int RINGBUFFER_DEFAULT_SIZE = 16384;
+ private static final int RINGBUFFER_MIN_SIZE = 128;
+
+ private final static Logger LOG = Logger.getLogger(NonBlockingStatsDClient.class.getName());
private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(Exception e) { /* No-op */ }
@@ -67,23 +81,37 @@ protected NumberFormat initialValue() {
}
};
+ private final static EventFactory FACTORY = new EventFactory() {
+ @Override
+ public Event newInstance() {
+ return new Event();
+ }
+ };
+
+ private static final EventTranslatorOneArg TRANSLATOR = new EventTranslatorOneArg() {
+ @Override
+ public void translateTo(Event event, long sequence, String msg) {
+ event.setValue(msg);
+ }
+ };
+
private final String prefix;
private final DatagramChannel clientChannel;
private final InetSocketAddress address;
private final StatsDClientErrorHandler handler;
private final String constantTagsRendered;
- private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
- result.setName("StatsD-" + result.getName());
+ result.setName("StatsD-disruptor-" + result.getName());
result.setDaemon(true);
return result;
}
});
- private final BlockingQueue queue = new LinkedBlockingQueue();
+ private final Disruptor disruptor = new Disruptor(FACTORY, calculateRingBufferSize(), executor);
/**
* Create a new StatsD client communicating with a StatsD instance on the
@@ -182,7 +210,10 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
- this.executor.submit(new QueueConsumer());
+
+ disruptor.handleEventsWith(new Handler());
+ disruptor.handleExceptionsWith(new DisruptorExceptionHandler(this.handler));
+ disruptor.start();
}
/**
@@ -192,6 +223,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
@Override
public void stop() {
try {
+ disruptor.shutdown();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
@@ -438,41 +470,78 @@ public void histogram(String aspect, int value, String... tags) {
}
private void send(String message) {
- queue.offer(message);
+ if(!disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, message)) {
+ handler.handle(InsufficientCapacityException.INSTANCE);
+ }
+ }
+
+ private static int calculateRingBufferSize() {
+ int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
+
+ String userPreferredRBSize = System.getProperty(
+ "NonBlockingStatsDClient.ringBufferSize",
+ String.valueOf(RINGBUFFER_DEFAULT_SIZE));
+
+ try {
+ ringBufferSize = Integer.parseInt(userPreferredRBSize);
+ if (ringBufferSize < RINGBUFFER_MIN_SIZE) {
+ ringBufferSize = RINGBUFFER_MIN_SIZE;
+
+ LOG.warning(String.format(
+ "Invalid RingBufferSize {}, using minimum size {}.",
+ userPreferredRBSize,
+ RINGBUFFER_MIN_SIZE));
+ }
+ } catch (NumberFormatException ex) {
+ LOG.warning(String.format(
+ "Invalid RingBufferSize {}, using default size {}.",
+ userPreferredRBSize,
+ RINGBUFFER_DEFAULT_SIZE));
+ }
+ return Util.ceilingNextPowerOfTwo(ringBufferSize);
+ }
+
+ private static class Event {
+
+ private String value;
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return "Event: " + value;
+ }
}
- private class QueueConsumer implements Runnable {
+ private class Handler implements EventHandler {
+
private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);
- @Override public void run() {
- while(!executor.isShutdown()) {
- try {
- String message = queue.poll(1, TimeUnit.SECONDS);
- if(null != message) {
- byte[] data = message.getBytes();
- if(sendBuffer.remaining() < (data.length + 1)) {
- blockingSend();
- }
- if(sendBuffer.position() > 0) {
- sendBuffer.put( (byte) '\n');
- }
- sendBuffer.put(data);
- if(null == queue.peek()) {
- blockingSend();
- }
- }
- } catch (Exception e) {
- handler.handle(e);
- }
+ @Override
+ public void onEvent(Event event, long sequence, boolean batchEnd) throws Exception {
+ String message = event.value;
+ byte[] data = message.getBytes();
+ if(sendBuffer.remaining() < (data.length + 1)) {
+ flush();
+ }
+ if(sendBuffer.position() > 0) {
+ sendBuffer.put( (byte) '\n');
+ }
+ sendBuffer.put(
+ data.length > sendBuffer.remaining() ? Arrays.copyOfRange(data, 0, sendBuffer.remaining()) : data);
+
+ if(batchEnd || 0 == sendBuffer.remaining()) {
+ flush();
}
}
- private void blockingSend() throws IOException {
+ private void flush() throws IOException {
int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();
int sentBytes = clientChannel.send(sendBuffer, address);
- sendBuffer.limit(sendBuffer.capacity());
- sendBuffer.rewind();
+ sendBuffer.clear();
if (sizeOfBuffer != sentBytes) {
handler.handle(
@@ -487,4 +556,41 @@ private void blockingSend() throws IOException {
}
}
}
+
+ private static class DisruptorExceptionHandler implements ExceptionHandler {
+
+ private final FatalExceptionHandler throwableHandler = new FatalExceptionHandler();
+ private final StatsDClientErrorHandler exceptionHandler;
+
+ public DisruptorExceptionHandler(StatsDClientErrorHandler handler) {
+ this.exceptionHandler = handler;
+ }
+
+ @Override
+ public void handleEventException(Throwable ex, long sequence, Object event) {
+ if(ex instanceof Exception) {
+ exceptionHandler.handle((Exception) ex);
+ } else {
+ throwableHandler.handleEventException(ex, sequence, event);
+ }
+ }
+
+ @Override
+ public void handleOnStartException(Throwable ex) {
+ if(ex instanceof Exception) {
+ exceptionHandler.handle((Exception) ex);
+ } else {
+ throwableHandler.handleOnStartException(ex);
+ }
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable ex) {
+ if(ex instanceof Exception) {
+ exceptionHandler.handle((Exception) ex);
+ } else {
+ throwableHandler.handleOnShutdownException(ex);
+ }
+ }
+ }
}