diff --git a/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java b/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java index 1d44557ce..3bd37bfe8 100644 --- a/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java +++ b/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java @@ -8,7 +8,6 @@ import java.util.*; import biz.paluch.logging.RuntimeContainer; -import biz.paluch.logging.RuntimeContainerProperties; import biz.paluch.logging.StackTraceFilter; import biz.paluch.logging.gelf.intern.GelfMessage; import biz.paluch.logging.gelf.intern.HostAndPortProvider; @@ -23,9 +22,11 @@ */ public class GelfMessageAssembler implements HostAndPortProvider { + /** + * @deprecated see {@link PoolingGelfMessageBuilder#PROPERTY_USE_POOLING}. + */ + @Deprecated public static final String PROPERTY_USE_POOLING = "logstash-gelf.message.pooling"; - private static final boolean USE_POOLING = Boolean - .valueOf(RuntimeContainerProperties.getProperty(PROPERTY_USE_POOLING, "true")); private static final int MAX_SHORT_MESSAGE_LENGTH = 250; private static final int MAX_PORT_NUMBER = 65535; @@ -48,17 +49,22 @@ public class GelfMessageAssembler implements HostAndPortProvider { private String timestampPattern = "yyyy-MM-dd HH:mm:ss,SSSS"; - private boolean usePooling = USE_POOLING; + private final ThreadLocal builders; - private ThreadLocal builders = new ThreadLocal() { + public GelfMessageAssembler() { - @Override - protected PoolingGelfMessageBuilder initialValue() { - return PoolingGelfMessageBuilder.newInstance(); - } - }; + if (PoolingGelfMessageBuilder.usePooling()) { - public GelfMessageAssembler() { + builders = new ThreadLocal() { + + @Override + protected PoolingGelfMessageBuilder initialValue() { + return PoolingGelfMessageBuilder.newInstance(); + } + }; + } else { + builders = null; + } } /** @@ -84,8 +90,7 @@ public void initialize(PropertyProvider propertyProvider) { originHost = propertyProvider.getProperty(PropertyProvider.PROPERTY_ORIGIN_HOST); setExtractStackTrace(propertyProvider.getProperty(PropertyProvider.PROPERTY_EXTRACT_STACKTRACE)); - setFilterStackTrace( - "true".equalsIgnoreCase(propertyProvider.getProperty(PropertyProvider.PROPERTY_FILTER_STACK_TRACE))); + setFilterStackTrace("true".equalsIgnoreCase(propertyProvider.getProperty(PropertyProvider.PROPERTY_FILTER_STACK_TRACE))); String includeLogMessageParameters = propertyProvider .getProperty(PropertyProvider.PROPERTY_INCLUDE_LOG_MESSAGE_PARAMETERS); @@ -116,7 +121,7 @@ public void initialize(PropertyProvider propertyProvider) { */ public GelfMessage createGelfMessage(LogEvent logEvent) { - GelfMessageBuilder builder = usePooling ? builders.get().recycle() : newInstance(); + GelfMessageBuilder builder = builders != null ? builders.get().recycle() : newInstance(); Throwable throwable = logEvent.getThrowable(); String message = logEvent.getMessage(); @@ -352,8 +357,8 @@ public int getMaximumMessageSize() { public void setMaximumMessageSize(int maximumMessageSize) { if (maximumMessageSize > MAX_MESSAGE_SIZE || maximumMessageSize < 1) { - throw new IllegalArgumentException( - "Invalid maximum message size: " + maximumMessageSize + ", supported range: 1-" + MAX_MESSAGE_SIZE); + throw new IllegalArgumentException("Invalid maximum message size: " + maximumMessageSize + ", supported range: 1-" + + MAX_MESSAGE_SIZE); } this.maximumMessageSize = maximumMessageSize; @@ -390,7 +395,7 @@ private StackTraceExtraction(boolean enabled, boolean filter, int ref) { /** * Parse the stack trace filtering value. - * + * * @param value * @return */ diff --git a/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java b/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java index 671543514..a2ecb2a89 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java @@ -9,20 +9,6 @@ */ abstract class OutputAccessor { - private static final ThreadLocal accessors = new ThreadLocal() { - @Override - protected ByteBufferOutputAccessor initialValue() { - return new ByteBufferOutputAccessor(); - } - }; - - private static final ThreadLocal streams = new ThreadLocal() { - @Override - protected ByteBufferOutputStream initialValue() { - return new ByteBufferOutputStream(null); - } - }; - public abstract void write(int b); public abstract void write(byte[] b); @@ -31,7 +17,7 @@ protected ByteBufferOutputStream initialValue() { /** * Create an {@link OutputAccessor} for the given {@link OutputStream}. - * + * * @param outputStream * @return */ @@ -47,7 +33,22 @@ public static OutputAccessor from(OutputStream outputStream) { */ public static OutputAccessor from(ByteBuffer byteBuffer) { - ByteBufferOutputAccessor accessor = accessors.get(); + ByteBufferOutputAccessor accessor = new ByteBufferOutputAccessor(); + accessor.byteBuffer = byteBuffer; + + return accessor; + } + + /** + * Create an {@link OutputAccessor} for the given {@link ByteBuffer}. Instances are pooled within the thread scope. + * + * @param poolHolder + * @param byteBuffer + * @return + */ + public static OutputAccessor from(OutputAccessorPoolHolder poolHolder, ByteBuffer byteBuffer) { + + ByteBufferOutputAccessor accessor = poolHolder.getByteBufferOutputAccessor(); accessor.byteBuffer = byteBuffer; return accessor; @@ -56,21 +57,24 @@ public static OutputAccessor from(ByteBuffer byteBuffer) { /** * Retrieve a pooled {@link OutputStream}. * + * @param poolHolder * @return */ - public static OutputStream pooledStream() { - return streams.get(); + public static OutputStream pooledStream(OutputAccessorPoolHolder poolHolder) { + return poolHolder.getByteBufferOutputStream(); } /** - * Retrieved a pooled an {@link OutputStream} for the given {@link ByteBuffer}. Instances are pooled within the thread scope. + * Retrieved a pooled an {@link OutputStream} for the given {@link ByteBuffer}. Instances are pooled within the thread + * scope. * + * @param poolHolder * @param byteBuffer * @return */ - public static OutputStream asStream(ByteBuffer byteBuffer) { + public static OutputStream asStream(OutputAccessorPoolHolder poolHolder, ByteBuffer byteBuffer) { - ByteBufferOutputStream accessor = streams.get(); + ByteBufferOutputStream accessor = poolHolder.getByteBufferOutputStream(); accessor.byteBuffer = byteBuffer; return accessor; @@ -152,4 +156,38 @@ public void write(int b) { } } } + + /** + * Holder for {@link ThreadLocal} pools. + */ + static class OutputAccessorPoolHolder { + + private final ThreadLocal accessorPool = new ThreadLocal() { + @Override + protected ByteBufferOutputAccessor initialValue() { + return new ByteBufferOutputAccessor(); + } + }; + + private final ThreadLocal streamPool = new ThreadLocal() { + @Override + protected ByteBufferOutputStream initialValue() { + return new ByteBufferOutputStream(null); + } + }; + + /** + * @return a pooled {@link ByteBufferOutputAccessor} instance. + */ + public ByteBufferOutputAccessor getByteBufferOutputAccessor() { + return accessorPool.get(); + } + + /** + * @return a pooled {@link ByteBufferOutputStream} instance. + */ + public ByteBufferOutputStream getByteBufferOutputStream() { + return streamPool.get(); + } + } } diff --git a/src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java b/src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java new file mode 100644 index 000000000..5f3c72be1 --- /dev/null +++ b/src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java @@ -0,0 +1,141 @@ +package biz.paluch.logging.gelf.intern; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Holder for {@link ThreadLocal} pools. + * + * @author Mark Paluch + */ +abstract class PoolHolder { + + public static PoolHolder noop() { + return NoOpPoolHolder.noop(); + } + + public static PoolHolder threadLocal() { + return new ThreadLocalPoolHolder(); + } + + /** + * @return the {@link OutputAccessor.OutputAccessorPoolHolder}. + */ + public abstract OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder(); + + /** + * @return a pooled {@link ReusableGzipOutputStream} instance. + */ + public abstract ReusableGzipOutputStream getReusableGzipOutputStream(); + + /** + * @return a pooled {@link ByteBuffer}-array instance. + */ + public abstract ByteBuffer[] getSingleBuffer(); + + /** + * @return a pooled byte-array instance. + */ + public abstract byte[] getByteArray(); + + private static class NoOpPoolHolder extends PoolHolder { + + private final static NoOpPoolHolder instance = new NoOpPoolHolder(); + + public static PoolHolder noop() { + return instance; + } + + @Override + public OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder() { + throw new UnsupportedOperationException(); + } + + @Override + public ReusableGzipOutputStream getReusableGzipOutputStream() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] getSingleBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getByteArray() { + throw new UnsupportedOperationException(); + } + } + + /** + * {@link PoolHolder} backed by {@link ThreadLocal}. + */ + private static class ThreadLocalPoolHolder extends PoolHolder { + + private final OutputAccessor.OutputAccessorPoolHolder outputAccessorPoolHolder; + + private final ThreadLocal streamPool = new ThreadLocal() { + @Override + protected ReusableGzipOutputStream initialValue() { + try { + return new ReusableGzipOutputStream(OutputAccessor.pooledStream(outputAccessorPoolHolder)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + }; + + private final ThreadLocal singleBufferPool = new ThreadLocal() { + @Override + protected ByteBuffer[] initialValue() { + return new ByteBuffer[1]; + } + }; + + private final ThreadLocal byteArrayPool = new ThreadLocal() { + @Override + protected byte[] initialValue() { + return new byte[8192 * 2]; + } + }; + + /** + * Create a new {@link PoolHolder} instance. + */ + public ThreadLocalPoolHolder() { + this(new OutputAccessor.OutputAccessorPoolHolder()); + } + + private ThreadLocalPoolHolder(OutputAccessor.OutputAccessorPoolHolder outputAccessorPoolHolder) { + this.outputAccessorPoolHolder = outputAccessorPoolHolder; + } + + /** + * @return the {@link OutputAccessor.OutputAccessorPoolHolder}. + */ + public OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder() { + return outputAccessorPoolHolder; + } + + /** + * @return a pooled {@link ReusableGzipOutputStream} instance. + */ + public ReusableGzipOutputStream getReusableGzipOutputStream() { + return streamPool.get(); + } + + /** + * @return a pooled {@link ByteBuffer}-array instance. + */ + public ByteBuffer[] getSingleBuffer() { + return singleBufferPool.get(); + } + + /** + * @return a pooled byte-array instance. + */ + public byte[] getByteArray() { + return byteArrayPool.get(); + } + } +} diff --git a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java index 5874815da..2e042a62f 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java @@ -12,36 +12,19 @@ */ class PoolingGelfMessage extends GelfMessage { - private static final ThreadLocal BYTE_ARRAY_POOL = new ThreadLocal() { - @Override - protected byte[] initialValue() { - return new byte[8192 * 2]; - } - }; - - private static final ThreadLocal STREAM_POOL = new ThreadLocal() { - @Override - protected ReusableGzipOutputStream initialValue() { - try { - return new ReusableGzipOutputStream(OutputAccessor.pooledStream()); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - }; + private final PoolHolder poolHolder; - private static final ThreadLocal SINGLE_BUFFER = new ThreadLocal() { - @Override - protected ByteBuffer[] initialValue() { - return new ByteBuffer[1]; - } - }; - - public PoolingGelfMessage() { + public PoolingGelfMessage(PoolHolder poolHolder) { + this.poolHolder = poolHolder; } - public PoolingGelfMessage(String shortMessage, String fullMessage, long timestamp, String level) { + public PoolingGelfMessage(String shortMessage, String fullMessage, long timestamp, String level, PoolHolder poolHolder) { super(shortMessage, fullMessage, timestamp, level); + this.poolHolder = poolHolder; + } + + public void toJson(ByteBuffer byteBuffer, String additionalFieldPrefix) { + toJson(OutputAccessor.from(poolHolder.getOutputAccessorPoolHolder(), byteBuffer), additionalFieldPrefix); } public ByteBuffer[] toUDPBuffers(ByteBuffer buffer, ByteBuffer tempBuffer) { @@ -49,9 +32,9 @@ public ByteBuffer[] toUDPBuffers(ByteBuffer buffer, ByteBuffer tempBuffer) { try { toJson(buffer, "_"); - OutputAccessor.asStream(tempBuffer); + OutputAccessor.asStream(poolHolder.getOutputAccessorPoolHolder(), tempBuffer); - ReusableGzipOutputStream gz = STREAM_POOL.get(); + ReusableGzipOutputStream gz = poolHolder.getReusableGzipOutputStream(); gz.reset(); gz.writeHeader(); @@ -78,7 +61,7 @@ public ByteBuffer[] toUDPBuffers(ByteBuffer buffer, ByteBuffer tempBuffer) { buffer.clear(); return sliceDatagrams((ByteBuffer) tempBuffer.flip(), diagrams_length, buffer); } else { - ByteBuffer[] byteBuffers = SINGLE_BUFFER.get(); + ByteBuffer[] byteBuffers = poolHolder.getSingleBuffer(); byteBuffers[0] = (ByteBuffer) tempBuffer.flip(); return byteBuffers; } @@ -92,7 +75,7 @@ private void gzip(ByteBuffer source, ReusableGzipOutputStream gz) throws IOExcep int read = 0; source.position(0); - byte[] bytes = BYTE_ARRAY_POOL.get(); + byte[] bytes = poolHolder.getByteArray(); while (size > read) { if ((size - read) > bytes.length) { diff --git a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java index 578f3890a..24f5636cc 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java @@ -1,5 +1,6 @@ package biz.paluch.logging.gelf.intern; +import biz.paluch.logging.RuntimeContainerProperties; import biz.paluch.logging.gelf.GelfMessageBuilder; /** @@ -7,7 +8,24 @@ */ public class PoolingGelfMessageBuilder extends GelfMessageBuilder { - private PoolingGelfMessageBuilder() { + /** + * Can be + *
    + *
  • {@literal static} (default value) for static held pools
  • + *
  • {@literal true} for using instance-based held pools
  • + *
  • {@literal false} to disable pooling
  • + *
+ */ + public static final String PROPERTY_USE_POOLING = "logstash-gelf.message.pooling"; + + private static final String USE_POOLING_VAL = RuntimeContainerProperties.getProperty(PROPERTY_USE_POOLING, "static"); + private static final boolean STATIC_POOLING = USE_POOLING_VAL.equalsIgnoreCase("static"); + private static final PoolHolder STATIC_POOL_HOLDER = STATIC_POOLING ? PoolHolder.threadLocal() : PoolHolder.noop(); + + private final PoolHolder poolHolder; + + private PoolingGelfMessageBuilder(PoolHolder poolHolder) { + this.poolHolder = poolHolder; } /** @@ -16,7 +34,14 @@ private PoolingGelfMessageBuilder() { * @return GelfMessageBuilder */ public static PoolingGelfMessageBuilder newInstance() { - return new PoolingGelfMessageBuilder(); + return new PoolingGelfMessageBuilder(STATIC_POOLING ? STATIC_POOL_HOLDER : PoolHolder.threadLocal()); + } + + /** + * @return {@literal true} if pooling (static/instance-held pools) is enabled. + */ + public static boolean usePooling() { + return STATIC_POOLING || USE_POOLING_VAL.equalsIgnoreCase("true"); } /** @@ -48,7 +73,7 @@ public GelfMessageBuilder recycle() { */ public GelfMessage build() { - GelfMessage gelfMessage = new PoolingGelfMessage(shortMessage, fullMessage, javaTimestamp, level); + GelfMessage gelfMessage = new PoolingGelfMessage(shortMessage, fullMessage, javaTimestamp, level, poolHolder); gelfMessage.addFields(additionalFields); gelfMessage.setMaximumMessageSize(maximumMessageSize); gelfMessage.setVersion(version); diff --git a/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java b/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java index 569081003..314f3edf1 100644 --- a/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java +++ b/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java @@ -35,10 +35,9 @@ public class PoolingGelfMessageIntegrationTests { put("doubleWithDecimals", "2.1"); put("int", "2"); put("exception1", StackTraceFilter.getFilteredStackTrace(new IOException(new Exception(new Exception())))); - put("exception2", - StackTraceFilter.getFilteredStackTrace(new IllegalStateException(new Exception(new Exception())))); - put("exception3", StackTraceFilter - .getFilteredStackTrace(new IllegalArgumentException(new Exception(new IllegalArgumentException())))); + put("exception2", StackTraceFilter.getFilteredStackTrace(new IllegalStateException(new Exception(new Exception())))); + put("exception3", StackTraceFilter.getFilteredStackTrace(new IllegalArgumentException(new Exception( + new IllegalArgumentException())))); put("exception4", StackTraceFilter.getFilteredStackTrace(new Exception(new Exception(new Exception())))); put("exception5", StackTraceFilter.getFilteredStackTrace(new Exception(new Exception(new ConnectException())))); } @@ -137,7 +136,7 @@ public int getCurrentMillis() { private PoolingGelfMessage createPooledGelfMessage() { - PoolingGelfMessage gelfMessage = new PoolingGelfMessage() { + PoolingGelfMessage gelfMessage = new PoolingGelfMessage(PoolHolder.threadLocal()) { @Override public int getCurrentMillis() { return 1000;