Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send messages using a lmax disruptor instead of a blockng queue #10

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
</developers>

<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
Expand Down
168 changes: 137 additions & 31 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.Util;

/**
* A simple StatsD client implementation facilitating metrics recording.
Expand All @@ -38,11 +47,16 @@
* on any StatsD clients.</p>
*
* @author Tom Denley
* @author mick semb wever
*
*/
public final class NonBlockingStatsDClient implements StatsDClient {

private static final int PACKET_SIZE_BYTES = 1500;
private static final int RINGBUFFER_DEFAULT_SIZE = 16384;
private static final int RINGBUFFER_MIN_SIZE = 128;

private final static Logger LOG = Logger.getLogger(NonBlockingStatsDClient.class.getName());

private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(Exception e) { /* No-op */ }
Expand All @@ -67,23 +81,37 @@ protected NumberFormat initialValue() {
}
};

private final static EventFactory<Event> FACTORY = new EventFactory<Event>() {
@Override
public Event newInstance() {
return new Event();
}
};

private static final EventTranslatorOneArg<Event,String> TRANSLATOR = new EventTranslatorOneArg<Event,String>() {
@Override
public void translateTo(Event event, long sequence, String msg) {
event.setValue(msg);
}
};

private final String prefix;
private final DatagramChannel clientChannel;
private final InetSocketAddress address;
private final StatsDClientErrorHandler handler;
private final String constantTagsRendered;

private final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override public Thread newThread(Runnable r) {
Thread result = delegate.newThread(r);
result.setName("StatsD-" + result.getName());
result.setName("StatsD-disruptor-" + result.getName());
result.setDaemon(true);
return result;
}
});

private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
private final Disruptor<Event> disruptor = new Disruptor<Event>(FACTORY, calculateRingBufferSize(), executor);

/**
* Create a new StatsD client communicating with a StatsD instance on the
Expand Down Expand Up @@ -182,7 +210,10 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
this.executor.submit(new QueueConsumer());

disruptor.handleEventsWith(new Handler());
disruptor.handleExceptionsWith(new DisruptorExceptionHandler(this.handler));
disruptor.start();
}

/**
Expand All @@ -192,6 +223,7 @@ public NonBlockingStatsDClient(String prefix, String hostname, int port, String[
@Override
public void stop() {
try {
disruptor.shutdown();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -438,41 +470,78 @@ public void histogram(String aspect, int value, String... tags) {
}

private void send(String message) {
queue.offer(message);
if(!disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, message)) {
handler.handle(InsufficientCapacityException.INSTANCE);
}
}

private static int calculateRingBufferSize() {
int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;

String userPreferredRBSize = System.getProperty(
"NonBlockingStatsDClient.ringBufferSize",
String.valueOf(RINGBUFFER_DEFAULT_SIZE));

try {
ringBufferSize = Integer.parseInt(userPreferredRBSize);
if (ringBufferSize < RINGBUFFER_MIN_SIZE) {
ringBufferSize = RINGBUFFER_MIN_SIZE;

LOG.warning(String.format(
"Invalid RingBufferSize {}, using minimum size {}.",
userPreferredRBSize,
RINGBUFFER_MIN_SIZE));
}
} catch (NumberFormatException ex) {
LOG.warning(String.format(
"Invalid RingBufferSize {}, using default size {}.",
userPreferredRBSize,
RINGBUFFER_DEFAULT_SIZE));
}
return Util.ceilingNextPowerOfTwo(ringBufferSize);
}

private static class Event {

private String value;

public void setValue(String value) {
this.value = value;
}

@Override
public String toString() {
return "Event: " + value;
}
}

private class QueueConsumer implements Runnable {
private class Handler implements EventHandler<Event> {

private final ByteBuffer sendBuffer = ByteBuffer.allocate(PACKET_SIZE_BYTES);

@Override public void run() {
while(!executor.isShutdown()) {
try {
String message = queue.poll(1, TimeUnit.SECONDS);
if(null != message) {
byte[] data = message.getBytes();
if(sendBuffer.remaining() < (data.length + 1)) {
blockingSend();
}
if(sendBuffer.position() > 0) {
sendBuffer.put( (byte) '\n');
}
sendBuffer.put(data);
if(null == queue.peek()) {
blockingSend();
}
}
} catch (Exception e) {
handler.handle(e);
}
@Override
public void onEvent(Event event, long sequence, boolean batchEnd) throws Exception {
String message = event.value;
byte[] data = message.getBytes();
if(sendBuffer.remaining() < (data.length + 1)) {
flush();
}
if(sendBuffer.position() > 0) {
sendBuffer.put( (byte) '\n');
}
sendBuffer.put(
data.length > sendBuffer.remaining() ? Arrays.copyOfRange(data, 0, sendBuffer.remaining()) : data);

if(batchEnd || 0 == sendBuffer.remaining()) {
flush();
}
}

private void blockingSend() throws IOException {
private void flush() throws IOException {
int sizeOfBuffer = sendBuffer.position();
sendBuffer.flip();
int sentBytes = clientChannel.send(sendBuffer, address);
sendBuffer.limit(sendBuffer.capacity());
sendBuffer.rewind();
sendBuffer.clear();

if (sizeOfBuffer != sentBytes) {
handler.handle(
Expand All @@ -487,4 +556,41 @@ private void blockingSend() throws IOException {
}
}
}

private static class DisruptorExceptionHandler implements ExceptionHandler {

private final FatalExceptionHandler throwableHandler = new FatalExceptionHandler();
private final StatsDClientErrorHandler exceptionHandler;

public DisruptorExceptionHandler(StatsDClientErrorHandler handler) {
this.exceptionHandler = handler;
}

@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
if(ex instanceof Exception) {
exceptionHandler.handle((Exception) ex);
} else {
throwableHandler.handleEventException(ex, sequence, event);
}
}

@Override
public void handleOnStartException(Throwable ex) {
if(ex instanceof Exception) {
exceptionHandler.handle((Exception) ex);
} else {
throwableHandler.handleOnStartException(ex);
}
}

@Override
public void handleOnShutdownException(Throwable ex) {
if(ex instanceof Exception) {
exceptionHandler.handle((Exception) ex);
} else {
throwableHandler.handleOnShutdownException(ex);
}
}
}
}