Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Retry network writes until buffers are empty #96
Browse files Browse the repository at this point in the history
A high write volume can cause package loss when using TCP or UDP senders. This is because of saturated send buffers. NIO channels don't block/continue writing if send buffers are full but return from the call. Inspecting the remaining buffer size is a good measure to determine whether the buffer was written entirely or whether bytes to write are left. Add synchronization to prevent GELF message interleaving.

TCP and UDP senders now retry writes until the message buffer is written entirely to the Channel (send buffer).

See also PR #96.
  • Loading branch information
mp911de committed Oct 6, 2016
1 parent ed6d17f commit 40f4fda
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,7 +29,7 @@ public class GelfTCPSender extends AbstractNioSender<SocketChannel> implements G
private final boolean keepAlive;
private final int deliveryAttempts;

private final Object connectLock = new Object();
private final Object ioLock = new Object();

private final ThreadLocal<ByteBuffer> writeBuffers = new ThreadLocal<ByteBuffer>() {
@Override
Expand Down Expand Up @@ -75,7 +76,7 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou
this.setChannel(createSocketChannel(readTimeoutMs, keepAlive));
}

private SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException {
protected SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket().setKeepAlive(keepAlive);
Expand All @@ -100,15 +101,26 @@ public boolean sendMessage(GelfMessage message) {

// (re)-connect if necessary
if (!isConnected()) {
synchronized (connectLock) {
synchronized (ioLock) {
connect();
}
}

ByteBuffer buffer;
if (BUFFER_SIZE == 0) {
channel().write(message.toTCPBuffer());
buffer = message.toTCPBuffer();
} else {
channel().write(message.toTCPBuffer(getByteBuffer()));
buffer = message.toTCPBuffer(getByteBuffer());
}

synchronized (ioLock) {
while (buffer.hasRemaining()) {
int written = channel().write(buffer);
if (written < 0) {
// indicator the socket was closed
Closer.close(channel());
throw new SocketException("Cannot write buffer to channel");
}
}
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
public class GelfUDPSender extends AbstractNioSender<DatagramChannel> implements GelfSender {

private final Object connectLock = new Object();
private final Object ioLock = new Object();

private final ThreadLocal<ByteBuffer> writeBuffers = new ThreadLocal<ByteBuffer>() {
@Override
Expand Down Expand Up @@ -60,13 +60,18 @@ private boolean sendDatagrams(ByteBuffer[] bytesList) {
try {
// (re)-connect if necessary
if (!isConnected()) {
synchronized (connectLock) {
synchronized (ioLock) {
connect();
}
}

for (ByteBuffer buffer : bytesList) {
channel().write(buffer);

synchronized (ioLock) {
while (buffer.hasRemaining()) {
channel().write(buffer);
}
}
}
} catch (IOException e) {
reportError(e.getMessage(), new IOException("Cannot send data to " + getHost() + ":" + getPort(), e));
Expand Down Expand Up @@ -99,7 +104,6 @@ protected void connect() throws IOException {
} catch (SocketException e) {
reportError(e.getMessage(), e);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package biz.paluch.logging.gelf.intern.sender;

import static org.fest.assertions.Assertions.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;

import biz.paluch.logging.gelf.intern.ErrorReporter;
import biz.paluch.logging.gelf.intern.GelfMessage;

/**
* @author <a href="mailto:[email protected]">Mark Paluch</a>
*/
public class GelfTCPSenderIntegrationTest {

private ByteArrayOutputStream out = new ByteArrayOutputStream();

@Test(timeout = 10000)
public void name() throws Exception {

final ServerSocket serverSocket = new ServerSocket(1234);
final CountDownLatch latch = new CountDownLatch(1);
serverSocket.setSoTimeout(1000);

Thread thread = new Thread("GelfTCPSenderIntegrationTest-server") {

@Override
public void run() {

try {
Socket socket = serverSocket.accept();
socket.setKeepAlive(true);
InputStream inputStream = socket.getInputStream();

while (!socket.isClosed()) {
IOUtils.copy(inputStream, out);
Thread.sleep(1);

if (latch.getCount() == 0) {
socket.close();
}
}

} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
}
};

try {
thread.start();

SmallBufferTCPSender sender = new SmallBufferTCPSender("localhost", 1234, 1000, 1000, new ErrorReporter() {
@Override
public void reportError(String message, Exception e) {
}
});

GelfMessage gelfMessage = new GelfMessage("hello", StringUtils.repeat("hello", 100000), 1234, "7");
ByteBuffer byteBuffer = gelfMessage.toTCPBuffer();
int size = byteBuffer.remaining();

sender.sendMessage(gelfMessage);
sender.close();

latch.countDown();
thread.join();

assertThat(out.size()).isEqualTo(size);

} finally {
thread.interrupt();
}

}

static class SmallBufferTCPSender extends GelfTCPSender {

public SmallBufferTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, ErrorReporter errorReporter)
throws IOException {
super(host, port, connectTimeoutMs, readTimeoutMs, errorReporter);
}

@Override
protected SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException {
SocketChannel socketChannel = super.createSocketChannel(readTimeoutMs, keepAlive);

socketChannel.socket().setSendBufferSize(100);

return socketChannel;
}
}
}

0 comments on commit 40f4fda

Please sign in to comment.