Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Detect disconnected channels #88
Browse files Browse the repository at this point in the history
NIO channels don't discover a disconnect without activity. logstash-gelf now performs a read operation before writing data. This way the socket can discover the connection state. Reading is non-blocking so the performance impact is minor.
  • Loading branch information
mp911de committed Jun 24, 2016
1 parent 87deeed commit 4f4d0fb
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package biz.paluch.logging.gelf.intern.sender;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.spi.AbstractSelectableChannel;

import biz.paluch.logging.gelf.intern.Closer;
import biz.paluch.logging.gelf.intern.ErrorReporter;

/**
* @author Mark Paluch
*/
public abstract class AbstractNioSender<T extends AbstractSelectableChannel & ByteChannel> implements ErrorReporter {

private T channel;
private volatile boolean shutdown = false;
private final ErrorReporter errorReporter;
private final String host;
private final int port;

private final ThreadLocal<ByteBuffer> readBuffers = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocate(1);
}
};

protected AbstractNioSender(ErrorReporter errorReporter, String host, int port) throws UnknownHostException {

// validate first address succeeds.
InetAddress.getByName(host);
this.errorReporter = errorReporter;
this.host = host;
this.port = port;

}

protected boolean isConnected() throws IOException {

ByteBuffer byteBuffer = readBuffers.get();
byteBuffer.clear();

if (channel() != null && channel().isOpen() && isConnected(channel()) && channel.read(byteBuffer) >= 0) {
return true;
}

return false;
}

protected abstract boolean isConnected(T channel);

protected T channel() {
return channel;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public void close() {
shutdown = true;
Closer.close(channel());
}

public boolean isShutdown() {
return shutdown;
}

@Override
public void reportError(String message, Exception e) {
errorReporter.reportError(message, e);
}

public void setChannel(T channel) {
this.channel = channel;
}
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,32 @@
package biz.paluch.logging.gelf.intern.sender;

import biz.paluch.logging.gelf.intern.Closer;
import biz.paluch.logging.gelf.intern.ErrorReporter;
import biz.paluch.logging.gelf.intern.GelfMessage;
import biz.paluch.logging.gelf.intern.GelfSender;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;

import biz.paluch.logging.gelf.intern.Closer;
import biz.paluch.logging.gelf.intern.ErrorReporter;
import biz.paluch.logging.gelf.intern.GelfMessage;
import biz.paluch.logging.gelf.intern.GelfSender;

/**
* @author https://github.com/t0xa/gelfj
* @author Mark Paluch
*/
public class GelfTCPSender implements GelfSender {
public class GelfTCPSender extends AbstractNioSender<SocketChannel> implements GelfSender {

public final static String CONNECTION_TIMEOUT = "connectionTimeout";
public final static String READ_TIMEOUT = "readTimeout";
public final static String RETRIES = "deliveryAttempts";
public final static String KEEPALIVE = "keepAlive";

private boolean shutdown = false;
private volatile SocketChannel socketChannel;

private final String host;
private final int port;
private final int readTimeoutMs;
private final int connectTimeoutMs;
private final boolean keepAlive;
private final int deliveryAttempts;
private final ErrorReporter errorReporter;

private final Object connectLock = new Object();

/**
Expand Down Expand Up @@ -63,17 +57,14 @@ public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeou
public GelfTCPSender(String host, int port, int connectTimeoutMs, int readTimeoutMs, int deliveryAttempts,
boolean keepAlive, ErrorReporter errorReporter) throws IOException {

// validate first address succeeds.
InetAddress.getByName(host);
this.host = host;
this.port = port;
this.errorReporter = errorReporter;
super(errorReporter, host, port);

this.connectTimeoutMs = connectTimeoutMs;
this.readTimeoutMs = readTimeoutMs;
this.keepAlive = keepAlive;
this.deliveryAttempts = deliveryAttempts < 1 ? Integer.MAX_VALUE : deliveryAttempts;

this.socketChannel = createSocketChannel(readTimeoutMs, keepAlive);
this.setChannel(createSocketChannel(readTimeoutMs, keepAlive));
}

private SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive) throws IOException {
Expand All @@ -85,13 +76,12 @@ private SocketChannel createSocketChannel(int readTimeoutMs, boolean keepAlive)
}

/**
*
* @param message the message
* @return
*/
public boolean sendMessage(GelfMessage message) {

if (shutdown) {
if (isShutdown()) {
return false;
}

Expand All @@ -101,49 +91,42 @@ public boolean sendMessage(GelfMessage message) {
try {

// (re)-connect if necessary
if (!socketChannel.isConnected()) {
if (!isConnected()) {
synchronized (connectLock) {
connect();
}
}

socketChannel.write(message.toTCPBuffer());
channel().write(message.toTCPBuffer());

return true;
} catch (IOException e) {
if (socketChannel != null) {
try {
socketChannel.close();
} catch (IOException o_O) {
// ignore
}
}
Closer.close(channel());
exception = e;
}
}

if (exception != null) {
errorReporter.reportError(exception.getMessage(),
new IOException("Cannot send data to " + host + ":" + port, exception));
reportError(exception.getMessage(),
new IOException("Cannot send data to " + getHost() + ":" + getPort(), exception));
}

return false;
}

protected void connect() throws IOException {


if (socketChannel.isConnected()) {
if (isConnected()) {
return;
}

if(!socketChannel.isOpen()){
socketChannel.close();
socketChannel = createSocketChannel(readTimeoutMs, keepAlive);
if (!channel().isOpen()) {
Closer.close(channel());
setChannel(createSocketChannel(readTimeoutMs, keepAlive));
}

InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
if (socketChannel.connect(inetSocketAddress)) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(getHost(), getPort());
if (channel().connect(inetSocketAddress)) {
return;
}

Expand All @@ -152,7 +135,7 @@ protected void connect() throws IOException {
long waitTimeoutNs = TimeUnit.MILLISECONDS.toNanos(waitTimeoutMs);
boolean connected;
try {
while (!(connected = socketChannel.finishConnect())) {
while (!(connected = channel().finishConnect())) {
Thread.sleep(waitTimeoutMs);
connectTimeoutLeft -= waitTimeoutNs;

Expand All @@ -169,11 +152,10 @@ protected void connect() throws IOException {
Thread.currentThread().interrupt();
throw new IOException("Connection interrupted", e);
}

}

public void close() {
shutdown = true;
Closer.close(socketChannel);
@Override
protected boolean isConnected(SocketChannel channel) {
return channel.isConnected();
}
}
Original file line number Diff line number Diff line change
@@ -1,48 +1,27 @@
package biz.paluch.logging.gelf.intern.sender;

import biz.paluch.logging.gelf.intern.Closer;
import biz.paluch.logging.gelf.intern.ErrorReporter;
import biz.paluch.logging.gelf.intern.GelfMessage;
import biz.paluch.logging.gelf.intern.GelfSender;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

import biz.paluch.logging.gelf.intern.Closer;
import biz.paluch.logging.gelf.intern.ErrorReporter;
import biz.paluch.logging.gelf.intern.GelfMessage;
import biz.paluch.logging.gelf.intern.GelfSender;

/**
* @author https://github.com/t0xa/gelfj
* @author Mark Paluch
*/
public class GelfUDPSender implements GelfSender {
public class GelfUDPSender extends AbstractNioSender<DatagramChannel> implements GelfSender {

private InetAddress host;
private int port;
private DatagramChannel channel;
private ErrorReporter errorReporter;
private final Object connectLock = new Object();

public GelfUDPSender(String host, int port, ErrorReporter errorReporter) throws IOException {
this.host = InetAddress.getByName(host);
this.port = port;
this.errorReporter = errorReporter;
this.channel = initiateChannel();
}

private DatagramChannel initiateChannel() throws IOException {
DatagramChannel resultingChannel = DatagramChannel.open();

try {
resultingChannel.socket().bind(new InetSocketAddress(0));
} catch (SocketException e) {
errorReporter.reportError(e.getMessage(), e);
}

resultingChannel.connect(new InetSocketAddress(this.host, this.port));
resultingChannel.configureBlocking(false);

return resultingChannel;
super(errorReporter, host, port);
connect();
}

public boolean sendMessage(GelfMessage message) {
Expand All @@ -52,18 +31,52 @@ public boolean sendMessage(GelfMessage message) {
private boolean sendDatagrams(ByteBuffer[] bytesList) {

try {
// (re)-connect if necessary
if (!isConnected()) {
synchronized (connectLock) {
connect();
}
}

for (ByteBuffer buffer : bytesList) {
channel.write(buffer);
channel().write(buffer);
}
} catch (IOException e) {
errorReporter.reportError(e.getMessage(), new IOException("Cannot send data to " + host + ":" + port, e));
reportError(e.getMessage(), new IOException("Cannot send data to " + getHost() + ":" + getPort(), e));
return false;
}

return true;
}

public void close() {
Closer.close(channel);
protected void connect() throws IOException {

if (isConnected()) {
return;
}

if (channel() == null) {
setChannel(DatagramChannel.open());
} else if (!channel().isOpen()) {
Closer.close(channel());
setChannel(DatagramChannel.open());
}

channel().configureBlocking(false);

InetSocketAddress inetSocketAddress = new InetSocketAddress(getHost(), getPort());

try {
DatagramChannel connect = channel().connect(inetSocketAddress);
setChannel(connect);
} catch (SocketException e) {
reportError(e.getMessage(), e);
}

}

@Override
protected boolean isConnected(DatagramChannel channel) {
return channel.isConnected();
}
}
Loading

0 comments on commit 4f4d0fb

Please sign in to comment.