From 90e9429360bfb026bad453e0f962149fb5ea2d72 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 24 Jun 2016 21:57:15 +0200 Subject: [PATCH] Detect disconnected channels #88 NIO channels don't discover a disconnect without activity. logstash-gelf now performs a read operation before writing data. This way the socket can discover the connection state. Reading is non-blocking so the performance impact is minor. --- .../gelf/intern/sender/AbstractNioSender.java | 84 ++++++++++++++++++ .../gelf/intern/sender/GelfTCPSender.java | 70 ++++++--------- .../gelf/intern/sender/GelfUDPSender.java | 81 ++++++++++------- .../gelf/intern/sender/GelfTCPSenderTest.java | 88 +++++++++++++++++++ .../gelf/intern/sender/GelfUDPSenderTest.java | 65 ++++++++++++-- 5 files changed, 305 insertions(+), 83 deletions(-) create mode 100644 src/main/java/biz/paluch/logging/gelf/intern/sender/AbstractNioSender.java diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/AbstractNioSender.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/AbstractNioSender.java new file mode 100644 index 000000000..e513dc1ba --- /dev/null +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/AbstractNioSender.java @@ -0,0 +1,84 @@ +package biz.paluch.logging.gelf.intern.sender; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.spi.AbstractSelectableChannel; + +import biz.paluch.logging.gelf.intern.Closer; +import biz.paluch.logging.gelf.intern.ErrorReporter; + +/** + * @author Mark Paluch + */ +public abstract class AbstractNioSender implements ErrorReporter { + + private T channel; + private volatile boolean shutdown = false; + private final ErrorReporter errorReporter; + private final String host; + private final int port; + + private final ThreadLocal readBuffers = new ThreadLocal() { + @Override + protected ByteBuffer initialValue() { + return ByteBuffer.allocate(1); + } + }; + + protected AbstractNioSender(ErrorReporter errorReporter, String host, int port) throws UnknownHostException { + + // validate first address succeeds. + InetAddress.getByName(host); + this.errorReporter = errorReporter; + this.host = host; + this.port = port; + + } + + protected boolean isConnected() throws IOException { + + ByteBuffer byteBuffer = readBuffers.get(); + byteBuffer.clear(); + + if (channel() != null && channel().isOpen() && isConnected(channel()) && channel.read(byteBuffer) >= 0) { + return true; + } + + return false; + } + + protected abstract boolean isConnected(T channel); + + protected T channel() { + return channel; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public void close() { + shutdown = true; + Closer.close(channel()); + } + + public boolean isShutdown() { + return shutdown; + } + + @Override + public void reportError(String message, Exception e) { + errorReporter.reportError(message, e); + } + + public void setChannel(T channel) { + this.channel = channel; + } +} diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java index f7e2d565b..c2f7e607e 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSender.java @@ -1,38 +1,32 @@ package biz.paluch.logging.gelf.intern.sender; -import biz.paluch.logging.gelf.intern.Closer; -import biz.paluch.logging.gelf.intern.ErrorReporter; -import biz.paluch.logging.gelf.intern.GelfMessage; -import biz.paluch.logging.gelf.intern.GelfSender; - import java.io.IOException; import java.net.ConnectException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; +import biz.paluch.logging.gelf.intern.Closer; +import biz.paluch.logging.gelf.intern.ErrorReporter; +import biz.paluch.logging.gelf.intern.GelfMessage; +import biz.paluch.logging.gelf.intern.GelfSender; + /** * @author https://github.com/t0xa/gelfj * @author Mark Paluch */ -public class GelfTCPSender implements GelfSender { +public class GelfTCPSender extends AbstractNioSender implements GelfSender { public final static String CONNECTION_TIMEOUT = "connectionTimeout"; public final static String READ_TIMEOUT = "readTimeout"; public final static String RETRIES = "deliveryAttempts"; public final static String KEEPALIVE = "keepAlive"; - private boolean shutdown = false; - private volatile SocketChannel socketChannel; - - private final String host; - private final int port; private final int readTimeoutMs; private final int connectTimeoutMs; private final boolean keepAlive; private final int deliveryAttempts; - private final ErrorReporter errorReporter; + private final Object connectLock = new Object(); /** @@ -63,17 +57,14 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, int deliveryAttempts, boolean keepAlive, ErrorReporter errorReporter) throws IOException { - // validate first address succeeds. - InetAddress.getByName(host); - this.host = host; - this.port = port; - this.errorReporter = errorReporter; + super(errorReporter, host, port); + this.connectTimeoutMs = connectTimeoutMs; this.readTimeoutMs = readTimeoutMs; this.keepAlive = keepAlive; this.deliveryAttempts = deliveryAttempts < 1 ? Integer.MAX_VALUE : deliveryAttempts; - this.socketChannel = createSocketChannel(readTimeoutMs, keepAlive); + this.setChannel(createSocketChannel(readTimeoutMs, keepAlive)); } private SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException { @@ -85,13 +76,12 @@ private SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) } /** - * * @param message the message * @return */ public boolean sendMessage(GelfMessage message) { - if (shutdown) { + if (isShutdown()) { return false; } @@ -101,30 +91,24 @@ public boolean sendMessage(GelfMessage message) { try { // (re)-connect if necessary - if (!socketChannel.isConnected()) { + if (!isConnected()) { synchronized (connectLock) { connect(); } } - socketChannel.write(message.toTCPBuffer()); + channel().write(message.toTCPBuffer()); return true; } catch (IOException e) { - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (IOException o_O) { - // ignore - } - } + Closer.close(channel()); exception = e; } } if (exception != null) { - errorReporter.reportError(exception.getMessage(), - new IOException("Cannot send data to " + host + ":" + port, exception)); + reportError(exception.getMessage(), + new IOException("Cannot send data to " + getHost() + ":" + getPort(), exception)); } return false; @@ -132,18 +116,17 @@ public boolean sendMessage(GelfMessage message) { protected void connect() throws IOException { - - if (socketChannel.isConnected()) { + if (isConnected()) { return; } - if(!socketChannel.isOpen()){ - socketChannel.close(); - socketChannel = createSocketChannel(readTimeoutMs, keepAlive); + if (!channel().isOpen()) { + Closer.close(channel()); + setChannel(createSocketChannel(readTimeoutMs, keepAlive)); } - InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port); - if (socketChannel.connect(inetSocketAddress)) { + InetSocketAddress inetSocketAddress = new InetSocketAddress(getHost(), getPort()); + if (channel().connect(inetSocketAddress)) { return; } @@ -152,7 +135,7 @@ protected void connect() throws IOException { long waitTimeoutNs = TimeUnit.MILLISECONDS.toNanos(waitTimeoutMs); boolean connected; try { - while (!(connected = socketChannel.finishConnect())) { + while (!(connected = channel().finishConnect())) { Thread.sleep(waitTimeoutMs); connectTimeoutLeft -= waitTimeoutNs; @@ -169,11 +152,10 @@ protected void connect() throws IOException { Thread.currentThread().interrupt(); throw new IOException("Connection interrupted", e); } - } - public void close() { - shutdown = true; - Closer.close(socketChannel); + @Override + protected boolean isConnected(SocketChannel channel) { + return channel.isConnected(); } } diff --git a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java index 9ea7408aa..b16a21f8a 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSender.java @@ -1,48 +1,27 @@ package biz.paluch.logging.gelf.intern.sender; -import biz.paluch.logging.gelf.intern.Closer; -import biz.paluch.logging.gelf.intern.ErrorReporter; -import biz.paluch.logging.gelf.intern.GelfMessage; -import biz.paluch.logging.gelf.intern.GelfSender; - import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import biz.paluch.logging.gelf.intern.Closer; +import biz.paluch.logging.gelf.intern.ErrorReporter; +import biz.paluch.logging.gelf.intern.GelfMessage; +import biz.paluch.logging.gelf.intern.GelfSender; + /** * @author https://github.com/t0xa/gelfj * @author Mark Paluch */ -public class GelfUDPSender implements GelfSender { +public class GelfUDPSender extends AbstractNioSender implements GelfSender { - private InetAddress host; - private int port; - private DatagramChannel channel; - private ErrorReporter errorReporter; + private final Object connectLock = new Object(); public GelfUDPSender(String host, int port, ErrorReporter errorReporter) throws IOException { - this.host = InetAddress.getByName(host); - this.port = port; - this.errorReporter = errorReporter; - this.channel = initiateChannel(); - } - - private DatagramChannel initiateChannel() throws IOException { - DatagramChannel resultingChannel = DatagramChannel.open(); - - try { - resultingChannel.socket().bind(new InetSocketAddress(0)); - } catch (SocketException e) { - errorReporter.reportError(e.getMessage(), e); - } - - resultingChannel.connect(new InetSocketAddress(this.host, this.port)); - resultingChannel.configureBlocking(false); - - return resultingChannel; + super(errorReporter, host, port); + connect(); } public boolean sendMessage(GelfMessage message) { @@ -52,18 +31,52 @@ public boolean sendMessage(GelfMessage message) { private boolean sendDatagrams(ByteBuffer[] bytesList) { try { + // (re)-connect if necessary + if (!isConnected()) { + synchronized (connectLock) { + connect(); + } + } + for (ByteBuffer buffer : bytesList) { - channel.write(buffer); + channel().write(buffer); } } catch (IOException e) { - errorReporter.reportError(e.getMessage(), new IOException("Cannot send data to " + host + ":" + port, e)); + reportError(e.getMessage(), new IOException("Cannot send data to " + getHost() + ":" + getPort(), e)); return false; } return true; } - public void close() { - Closer.close(channel); + protected void connect() throws IOException { + + if (isConnected()) { + return; + } + + if (channel() == null) { + setChannel(DatagramChannel.open()); + } else if (!channel().isOpen()) { + Closer.close(channel()); + setChannel(DatagramChannel.open()); + } + + channel().configureBlocking(false); + + InetSocketAddress inetSocketAddress = new InetSocketAddress(getHost(), getPort()); + + try { + DatagramChannel connect = channel().connect(inetSocketAddress); + setChannel(connect); + } catch (SocketException e) { + reportError(e.getMessage(), e); + } + + } + + @Override + protected boolean isConnected(DatagramChannel channel) { + return channel.isConnected(); } } diff --git a/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderTest.java b/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderTest.java index c4e8c7fc5..9f240717e 100644 --- a/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderTest.java +++ b/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfTCPSenderTest.java @@ -3,11 +3,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; import java.net.ConnectException; +import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; +import java.util.Random; import org.junit.Test; import org.junit.runner.RunWith; @@ -74,4 +80,86 @@ public void connectionTimeoutShouldApply() throws Exception { public void unknownHostShouldThrowException() throws Exception { new GelfTCPSender("unknown.host.unknown", 65534, 100, 100, errorReporter); } + + @Test + public void shouldOpenConnection() throws Exception { + + int port = randomPort(); + + ServerSocketChannel listener = ServerSocketChannel.open(); + listener.socket().bind(new InetSocketAddress(port)); + + GelfTCPSender tcpSender = new GelfTCPSender("127.0.0.1", port, 1000, 1000, errorReporter); + + GelfMessage gelfMessage = new GelfMessage("short", "long", 1, "info"); + gelfMessage.setHost("host"); + + GelfTCPSender spy = spy(tcpSender); + + spy.sendMessage(gelfMessage); + + verify(spy, times(2)).isConnected(); + verify(spy).connect(); + + listener.close(); + spy.close(); + } + + @Test + public void shouldSendDataToOpenPort() throws Exception { + + int port = randomPort(); + + ServerSocketChannel listener = ServerSocketChannel.open(); + listener.socket().bind(new InetSocketAddress(port)); + + GelfTCPSender tcpSender = new GelfTCPSender("127.0.0.1", port, 1000, 1000, errorReporter); + + GelfMessage gelfMessage = new GelfMessage("short", "long", 1, "info"); + gelfMessage.setHost("host"); + + tcpSender.sendMessage(gelfMessage); + + GelfTCPSender spy = spy(tcpSender); + + spy.sendMessage(gelfMessage); + + verify(spy).isConnected(); + verify(spy, never()).connect(); + + listener.close(); + spy.close(); + } + + @Test + public void shouldSendDataToClosedPort() throws Exception { + + int port = randomPort(); + + ServerSocketChannel listener = ServerSocketChannel.open(); + listener.socket().bind(new InetSocketAddress(port)); + + GelfTCPSender tcpSender = new GelfTCPSender("127.0.0.1", port, 1000, 1000, errorReporter); + + listener.socket().close(); + listener.close(); + + GelfMessage gelfMessage = new GelfMessage("short", "long", 1, "info"); + gelfMessage.setHost("host"); + tcpSender.sendMessage(gelfMessage); + + GelfTCPSender spy = spy(tcpSender); + + spy.sendMessage(gelfMessage); + + verify(spy, times(2)).isConnected(); + verify(spy).connect(); + + spy.close(); + } + + protected int randomPort() { + Random random = new Random(); + return random.nextInt(50000) + 1024; + } } \ No newline at end of file diff --git a/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSenderTest.java b/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSenderTest.java index 1f7cb8929..39cdba761 100644 --- a/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSenderTest.java +++ b/src/test/java/biz/paluch/logging/gelf/intern/sender/GelfUDPSenderTest.java @@ -1,23 +1,23 @@ package biz.paluch.logging.gelf.intern.sender; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; -import java.io.IOException; -import java.net.ConnectException; +import java.net.DatagramSocket; import java.net.UnknownHostException; +import java.util.Random; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; import biz.paluch.logging.gelf.intern.ErrorReporter; import biz.paluch.logging.gelf.intern.GelfMessage; -import org.mockito.runners.MockitoJUnitRunner; /** * @author Mark Paluch @@ -45,4 +45,59 @@ public void unreachablePacketsShouldBeDiscardedSilently() throws Exception { public void unknownHostShouldThrowException() throws Exception { new GelfUDPSender("unknown.host.unknown", 65534, errorReporter); } + + @Test + public void shouldSendDataToOpenPort() throws Exception { + + int port = randomPort(); + + DatagramSocket socket = new DatagramSocket(port); + + GelfUDPSender udpSender = new GelfUDPSender("127.0.0.1", port, errorReporter); + + GelfMessage gelfMessage = new GelfMessage("short", "long", 1, "info"); + gelfMessage.setHost("host"); + + udpSender.sendMessage(gelfMessage); + + GelfUDPSender spy = spy(udpSender); + + spy.sendMessage(gelfMessage); + + verify(spy).isConnected(); + verify(spy, never()).connect(); + + socket.close(); + spy.close(); + } + + @Test + public void shouldSendDataToClosedPort() throws Exception { + + int port = randomPort(); + + DatagramSocket socket = new DatagramSocket(port); + + GelfUDPSender udpSender = new GelfUDPSender("127.0.0.1", port, errorReporter); + socket.close(); + + GelfMessage gelfMessage = new GelfMessage("short", "long", 1, "info"); + gelfMessage.setHost("host"); + + udpSender.sendMessage(gelfMessage); + + GelfUDPSender spy = spy(udpSender); + + spy.sendMessage(gelfMessage); + + verify(spy).isConnected(); + verify(spy, never()).connect(); + + spy.close(); + } + + protected int randomPort() { + Random random = new Random(); + return random.nextInt(50000) + 1024; + } } \ No newline at end of file