diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java index 166b682b3..41b2f1a93 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; @@ -28,7 +29,7 @@ public class GelfTCPSender extends AbstractNioSender implements G private final boolean keepAlive; private final int deliveryAttempts; - private final Object connectLock = new Object(); + private final Object ioLock = new Object(); private final ThreadLocal writeBuffers = new ThreadLocal() { @Override @@ -75,7 +76,7 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou this.setChannel(createSocketChannel(readTimeoutMs, keepAlive)); } - private SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException { + protected SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setKeepAlive(keepAlive); @@ -100,15 +101,26 @@ public boolean sendMessage(GelfMessage message) { // (re)-connect if necessary if (!isConnected()) { - synchronized (connectLock) { + synchronized (ioLock) { connect(); } } - + ByteBuffer buffer; if (BUFFER_SIZE == 0) { - channel().write(message.toTCPBuffer()); + buffer = message.toTCPBuffer(); } else { - channel().write(message.toTCPBuffer(getByteBuffer())); + buffer = message.toTCPBuffer(getByteBuffer()); + } + + synchronized (ioLock) { + while (buffer.hasRemaining()) { + int written = channel().write(buffer); + if (written < 0) { + // indicator the socket was closed + Closer.close(channel()); + throw new SocketException("Cannot write buffer to channel"); + } + } } return true; diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java index 889cea07a..69dfb59cd 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java @@ -17,7 +17,7 @@ */ public class GelfUDPSender extends AbstractNioSender implements GelfSender { - private final Object connectLock = new Object(); + private final Object ioLock = new Object(); private final ThreadLocal writeBuffers = new ThreadLocal() { @Override @@ -60,13 +60,18 @@ private boolean sendDatagrams(ByteBuffer[] bytesList) { try { // (re)-connect if necessary if (!isConnected()) { - synchronized (connectLock) { + synchronized (ioLock) { connect(); } } for (ByteBuffer buffer : bytesList) { - channel().write(buffer); + + synchronized (ioLock) { + while (buffer.hasRemaining()) { + channel().write(buffer); + } + } } } catch (IOException e) { reportError(e.getMessage(), new IOException("Cannot send data to " + getHost() + ":" + getPort(), e)); @@ -99,7 +104,6 @@ protected void connect() throws IOException { } catch (SocketException e) { reportError(e.getMessage(), e); } - } @Override diff --git a/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderIntegrationTest.java b/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderIntegrationTest.java new file mode 100644 index 000000000..77a06f4ae --- /dev/null +++ b/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderIntegrationTest.java @@ -0,0 +1,104 @@ +package biz.paluch.logging.gelf.intern.sender; + +import static org.fest.assertions.Assertions.assertThat; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.junit.Test; + +import biz.paluch.logging.gelf.intern.ErrorReporter; +import biz.paluch.logging.gelf.intern.GelfMessage; + +/** + * @author Mark Paluch + */ +public class GelfTCPSenderIntegrationTest { + + private ByteArrayOutputStream out = new ByteArrayOutputStream(); + + @Test(timeout = 10000) + public void name() throws Exception { + + final ServerSocket serverSocket = new ServerSocket(1234); + final CountDownLatch latch = new CountDownLatch(1); + serverSocket.setSoTimeout(1000); + + Thread thread = new Thread("GelfTCPSenderIntegrationTest-server") { + + @Override + public void run() { + + try { + Socket socket = serverSocket.accept(); + socket.setKeepAlive(true); + InputStream inputStream = socket.getInputStream(); + + while (!socket.isClosed()) { + IOUtils.copy(inputStream, out); + Thread.sleep(1); + + if (latch.getCount() == 0) { + socket.close(); + } + } + + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + } + } + }; + + try { + thread.start(); + + SmallBufferTCPSender sender = new SmallBufferTCPSender("localhost", 1234, 1000, 1000, new ErrorReporter() { + @Override + public void reportError(String message, Exception e) { + } + }); + + GelfMessage gelfMessage = new GelfMessage("hello", StringUtils.repeat("hello", 100000), 1234, "7"); + ByteBuffer byteBuffer = gelfMessage.toTCPBuffer(); + int size = byteBuffer.remaining(); + + sender.sendMessage(gelfMessage); + sender.close(); + + latch.countDown(); + thread.join(); + + assertThat(out.size()).isEqualTo(size); + + } finally { + thread.interrupt(); + } + + } + + static class SmallBufferTCPSender extends GelfTCPSender { + + public SmallBufferTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, ErrorReporter errorReporter) + throws IOException { + super(host, port, connectTimeoutMs, readTimeoutMs, errorReporter); + } + + @Override + protected SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException { + SocketChannel socketChannel = super.createSocketChannel(readTimeoutMs, keepAlive); + + socketChannel.socket().setSendBufferSize(100); + + return socketChannel; + } + } +}