This repository has been archived by the owner on Jun 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 102
/
GelfUDPSender.java
104 lines (82 loc) · 3.01 KB
/
GelfUDPSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package biz.paluch.logging.gelf.intern.sender;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.UnresolvedAddressException;
import biz.paluch.logging.gelf.intern.Closer;
import biz.paluch.logging.gelf.intern.ErrorReporter;
import biz.paluch.logging.gelf.intern.GelfMessage;
import biz.paluch.logging.gelf.intern.GelfSender;
/**
* @author https://github.com/t0xa/gelfj
* @author Mark Paluch
*/
public class GelfUDPSender extends AbstractNioSender<DatagramChannel> implements GelfSender {
private final Object ioLock = new Object();
private final ThreadLocal<ByteBuffer> writeBuffers = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE);
}
};
private final ThreadLocal<ByteBuffer> tempBuffers = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE);
}
};
public GelfUDPSender(String host, int port, ErrorReporter errorReporter) throws IOException {
super(errorReporter, host, port);
}
public boolean sendMessage(GelfMessage message) {
if (INITIAL_BUFFER_SIZE == 0) {
return sendDatagrams(message.toUDPBuffers());
}
return sendDatagrams(GelfBuffers.toUDPBuffers(message, writeBuffers, tempBuffers));
}
private boolean sendDatagrams(ByteBuffer[] bytesList) {
try {
// (re)-connect if necessary
if (!isConnected()) {
synchronized (ioLock) {
connect();
}
}
for (ByteBuffer buffer : bytesList) {
synchronized (ioLock) {
while (buffer.hasRemaining()) {
channel().write(buffer);
}
}
}
} catch (IOException e) {
reportError(e.getMessage(), new IOException("Cannot send data to " + getHost() + ":" + getPort(), e));
return false;
}
return true;
}
protected void connect() throws IOException {
if (isConnected()) {
return;
}
if (channel() == null) {
setChannel(DatagramChannel.open());
} else if (!channel().isOpen()) {
Closer.close(channel());
setChannel(DatagramChannel.open());
}
channel().configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress(getHost(), getPort());
try {
DatagramChannel connect = channel().connect(inetSocketAddress);
setChannel(connect);
} catch (UnresolvedAddressException | IOException e) {
reportError(e.getMessage(), e);
}
}
@Override
protected boolean isConnected(DatagramChannel channel) {
return channel.isConnected();
}
}