From 499df595f037bf5f47fdb6c7547b441ad9b588cd Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 15 Mar 2017 21:56:36 +0100 Subject: [PATCH] Introduce PoolHolder for ThreadLocal-pooled objects #104 logstash-gelf now uses a PoolHolder instead of static final ThreadLocal instances to hold references to ThreadLocal's and the associated objects. Using an instance to hold ThreadLocal instances breaks the cycle so objects can be collected by the GC if the ClassLoader is no longer in use (i.e. because of a hot-redeploy without restarting the JVM). The object root and the PoolHolder are referenced from PoolingGelfMessageBuilder which is created per log appender/handler instance or as static instance (default setting). Holding pools per instance can increase memory usage. Pooling mode can be controlled via logstash-gelf.message.pooling (Environment variable, System property). --- .../logging/gelf/GelfMessageAssembler.java | 39 ++--- .../logging/gelf/intern/OutputAccessor.java | 80 +++++++--- .../logging/gelf/intern/PoolHolder.java | 141 ++++++++++++++++++ .../gelf/intern/PoolingGelfMessage.java | 43 ++---- .../intern/PoolingGelfMessageBuilder.java | 31 +++- .../PoolingGelfMessageIntegrationTests.java | 9 +- 6 files changed, 267 insertions(+), 76 deletions(-) create mode 100644 src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java diff --git a/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java b/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java index 1d44557ce..3bd37bfe8 100644 --- a/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java +++ b/src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java @@ -8,7 +8,6 @@ import java.util.*; import biz.paluch.logging.RuntimeContainer; -import biz.paluch.logging.RuntimeContainerProperties; import biz.paluch.logging.StackTraceFilter; import biz.paluch.logging.gelf.intern.GelfMessage; import biz.paluch.logging.gelf.intern.HostAndPortProvider; @@ -23,9 +22,11 @@ */ public class GelfMessageAssembler implements HostAndPortProvider { + /** + * @deprecated see {@link PoolingGelfMessageBuilder#PROPERTY_USE_POOLING}. + */ + @Deprecated public static final String PROPERTY_USE_POOLING = "logstash-gelf.message.pooling"; - private static final boolean USE_POOLING = Boolean - .valueOf(RuntimeContainerProperties.getProperty(PROPERTY_USE_POOLING, "true")); private static final int MAX_SHORT_MESSAGE_LENGTH = 250; private static final int MAX_PORT_NUMBER = 65535; @@ -48,17 +49,22 @@ public class GelfMessageAssembler implements HostAndPortProvider { private String timestampPattern = "yyyy-MM-dd HH:mm:ss,SSSS"; - private boolean usePooling = USE_POOLING; + private final ThreadLocal builders; - private ThreadLocal builders = new ThreadLocal() { + public GelfMessageAssembler() { - @Override - protected PoolingGelfMessageBuilder initialValue() { - return PoolingGelfMessageBuilder.newInstance(); - } - }; + if (PoolingGelfMessageBuilder.usePooling()) { - public GelfMessageAssembler() { + builders = new ThreadLocal() { + + @Override + protected PoolingGelfMessageBuilder initialValue() { + return PoolingGelfMessageBuilder.newInstance(); + } + }; + } else { + builders = null; + } } /** @@ -84,8 +90,7 @@ public void initialize(PropertyProvider propertyProvider) { originHost = propertyProvider.getProperty(PropertyProvider.PROPERTY_ORIGIN_HOST); setExtractStackTrace(propertyProvider.getProperty(PropertyProvider.PROPERTY_EXTRACT_STACKTRACE)); - setFilterStackTrace( - "true".equalsIgnoreCase(propertyProvider.getProperty(PropertyProvider.PROPERTY_FILTER_STACK_TRACE))); + setFilterStackTrace("true".equalsIgnoreCase(propertyProvider.getProperty(PropertyProvider.PROPERTY_FILTER_STACK_TRACE))); String includeLogMessageParameters = propertyProvider .getProperty(PropertyProvider.PROPERTY_INCLUDE_LOG_MESSAGE_PARAMETERS); @@ -116,7 +121,7 @@ public void initialize(PropertyProvider propertyProvider) { */ public GelfMessage createGelfMessage(LogEvent logEvent) { - GelfMessageBuilder builder = usePooling ? builders.get().recycle() : newInstance(); + GelfMessageBuilder builder = builders != null ? builders.get().recycle() : newInstance(); Throwable throwable = logEvent.getThrowable(); String message = logEvent.getMessage(); @@ -352,8 +357,8 @@ public int getMaximumMessageSize() { public void setMaximumMessageSize(int maximumMessageSize) { if (maximumMessageSize > MAX_MESSAGE_SIZE || maximumMessageSize < 1) { - throw new IllegalArgumentException( - "Invalid maximum message size: " + maximumMessageSize + ", supported range: 1-" + MAX_MESSAGE_SIZE); + throw new IllegalArgumentException("Invalid maximum message size: " + maximumMessageSize + ", supported range: 1-" + + MAX_MESSAGE_SIZE); } this.maximumMessageSize = maximumMessageSize; @@ -390,7 +395,7 @@ private StackTraceExtraction(boolean enabled, boolean filter, int ref) { /** * Parse the stack trace filtering value. - * + * * @param value * @return */ diff --git a/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java b/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java index 671543514..a2ecb2a89 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java @@ -9,20 +9,6 @@ */ abstract class OutputAccessor { - private static final ThreadLocal accessors = new ThreadLocal() { - @Override - protected ByteBufferOutputAccessor initialValue() { - return new ByteBufferOutputAccessor(); - } - }; - - private static final ThreadLocal streams = new ThreadLocal() { - @Override - protected ByteBufferOutputStream initialValue() { - return new ByteBufferOutputStream(null); - } - }; - public abstract void write(int b); public abstract void write(byte[] b); @@ -31,7 +17,7 @@ protected ByteBufferOutputStream initialValue() { /** * Create an {@link OutputAccessor} for the given {@link OutputStream}. - * + * * @param outputStream * @return */ @@ -47,7 +33,22 @@ public static OutputAccessor from(OutputStream outputStream) { */ public static OutputAccessor from(ByteBuffer byteBuffer) { - ByteBufferOutputAccessor accessor = accessors.get(); + ByteBufferOutputAccessor accessor = new ByteBufferOutputAccessor(); + accessor.byteBuffer = byteBuffer; + + return accessor; + } + + /** + * Create an {@link OutputAccessor} for the given {@link ByteBuffer}. Instances are pooled within the thread scope. + * + * @param poolHolder + * @param byteBuffer + * @return + */ + public static OutputAccessor from(OutputAccessorPoolHolder poolHolder, ByteBuffer byteBuffer) { + + ByteBufferOutputAccessor accessor = poolHolder.getByteBufferOutputAccessor(); accessor.byteBuffer = byteBuffer; return accessor; @@ -56,21 +57,24 @@ public static OutputAccessor from(ByteBuffer byteBuffer) { /** * Retrieve a pooled {@link OutputStream}. * + * @param poolHolder * @return */ - public static OutputStream pooledStream() { - return streams.get(); + public static OutputStream pooledStream(OutputAccessorPoolHolder poolHolder) { + return poolHolder.getByteBufferOutputStream(); } /** - * Retrieved a pooled an {@link OutputStream} for the given {@link ByteBuffer}. Instances are pooled within the thread scope. + * Retrieved a pooled an {@link OutputStream} for the given {@link ByteBuffer}. Instances are pooled within the thread + * scope. * + * @param poolHolder * @param byteBuffer * @return */ - public static OutputStream asStream(ByteBuffer byteBuffer) { + public static OutputStream asStream(OutputAccessorPoolHolder poolHolder, ByteBuffer byteBuffer) { - ByteBufferOutputStream accessor = streams.get(); + ByteBufferOutputStream accessor = poolHolder.getByteBufferOutputStream(); accessor.byteBuffer = byteBuffer; return accessor; @@ -152,4 +156,38 @@ public void write(int b) { } } } + + /** + * Holder for {@link ThreadLocal} pools. + */ + static class OutputAccessorPoolHolder { + + private final ThreadLocal accessorPool = new ThreadLocal() { + @Override + protected ByteBufferOutputAccessor initialValue() { + return new ByteBufferOutputAccessor(); + } + }; + + private final ThreadLocal streamPool = new ThreadLocal() { + @Override + protected ByteBufferOutputStream initialValue() { + return new ByteBufferOutputStream(null); + } + }; + + /** + * @return a pooled {@link ByteBufferOutputAccessor} instance. + */ + public ByteBufferOutputAccessor getByteBufferOutputAccessor() { + return accessorPool.get(); + } + + /** + * @return a pooled {@link ByteBufferOutputStream} instance. + */ + public ByteBufferOutputStream getByteBufferOutputStream() { + return streamPool.get(); + } + } } diff --git a/src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java b/src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java new file mode 100644 index 000000000..5f3c72be1 --- /dev/null +++ b/src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java @@ -0,0 +1,141 @@ +package biz.paluch.logging.gelf.intern; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Holder for {@link ThreadLocal} pools. + * + * @author Mark Paluch + */ +abstract class PoolHolder { + + public static PoolHolder noop() { + return NoOpPoolHolder.noop(); + } + + public static PoolHolder threadLocal() { + return new ThreadLocalPoolHolder(); + } + + /** + * @return the {@link OutputAccessor.OutputAccessorPoolHolder}. + */ + public abstract OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder(); + + /** + * @return a pooled {@link ReusableGzipOutputStream} instance. + */ + public abstract ReusableGzipOutputStream getReusableGzipOutputStream(); + + /** + * @return a pooled {@link ByteBuffer}-array instance. + */ + public abstract ByteBuffer[] getSingleBuffer(); + + /** + * @return a pooled byte-array instance. + */ + public abstract byte[] getByteArray(); + + private static class NoOpPoolHolder extends PoolHolder { + + private final static NoOpPoolHolder instance = new NoOpPoolHolder(); + + public static PoolHolder noop() { + return instance; + } + + @Override + public OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder() { + throw new UnsupportedOperationException(); + } + + @Override + public ReusableGzipOutputStream getReusableGzipOutputStream() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] getSingleBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getByteArray() { + throw new UnsupportedOperationException(); + } + } + + /** + * {@link PoolHolder} backed by {@link ThreadLocal}. + */ + private static class ThreadLocalPoolHolder extends PoolHolder { + + private final OutputAccessor.OutputAccessorPoolHolder outputAccessorPoolHolder; + + private final ThreadLocal streamPool = new ThreadLocal() { + @Override + protected ReusableGzipOutputStream initialValue() { + try { + return new ReusableGzipOutputStream(OutputAccessor.pooledStream(outputAccessorPoolHolder)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + }; + + private final ThreadLocal singleBufferPool = new ThreadLocal() { + @Override + protected ByteBuffer[] initialValue() { + return new ByteBuffer[1]; + } + }; + + private final ThreadLocal byteArrayPool = new ThreadLocal() { + @Override + protected byte[] initialValue() { + return new byte[8192 * 2]; + } + }; + + /** + * Create a new {@link PoolHolder} instance. + */ + public ThreadLocalPoolHolder() { + this(new OutputAccessor.OutputAccessorPoolHolder()); + } + + private ThreadLocalPoolHolder(OutputAccessor.OutputAccessorPoolHolder outputAccessorPoolHolder) { + this.outputAccessorPoolHolder = outputAccessorPoolHolder; + } + + /** + * @return the {@link OutputAccessor.OutputAccessorPoolHolder}. + */ + public OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder() { + return outputAccessorPoolHolder; + } + + /** + * @return a pooled {@link ReusableGzipOutputStream} instance. + */ + public ReusableGzipOutputStream getReusableGzipOutputStream() { + return streamPool.get(); + } + + /** + * @return a pooled {@link ByteBuffer}-array instance. + */ + public ByteBuffer[] getSingleBuffer() { + return singleBufferPool.get(); + } + + /** + * @return a pooled byte-array instance. + */ + public byte[] getByteArray() { + return byteArrayPool.get(); + } + } +} diff --git a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java index 5874815da..2e042a62f 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessage.java @@ -12,36 +12,19 @@ */ class PoolingGelfMessage extends GelfMessage { - private static final ThreadLocal BYTE_ARRAY_POOL = new ThreadLocal() { - @Override - protected byte[] initialValue() { - return new byte[8192 * 2]; - } - }; - - private static final ThreadLocal STREAM_POOL = new ThreadLocal() { - @Override - protected ReusableGzipOutputStream initialValue() { - try { - return new ReusableGzipOutputStream(OutputAccessor.pooledStream()); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - }; + private final PoolHolder poolHolder; - private static final ThreadLocal SINGLE_BUFFER = new ThreadLocal() { - @Override - protected ByteBuffer[] initialValue() { - return new ByteBuffer[1]; - } - }; - - public PoolingGelfMessage() { + public PoolingGelfMessage(PoolHolder poolHolder) { + this.poolHolder = poolHolder; } - public PoolingGelfMessage(String shortMessage, String fullMessage, long timestamp, String level) { + public PoolingGelfMessage(String shortMessage, String fullMessage, long timestamp, String level, PoolHolder poolHolder) { super(shortMessage, fullMessage, timestamp, level); + this.poolHolder = poolHolder; + } + + public void toJson(ByteBuffer byteBuffer, String additionalFieldPrefix) { + toJson(OutputAccessor.from(poolHolder.getOutputAccessorPoolHolder(), byteBuffer), additionalFieldPrefix); } public ByteBuffer[] toUDPBuffers(ByteBuffer buffer, ByteBuffer tempBuffer) { @@ -49,9 +32,9 @@ public ByteBuffer[] toUDPBuffers(ByteBuffer buffer, ByteBuffer tempBuffer) { try { toJson(buffer, "_"); - OutputAccessor.asStream(tempBuffer); + OutputAccessor.asStream(poolHolder.getOutputAccessorPoolHolder(), tempBuffer); - ReusableGzipOutputStream gz = STREAM_POOL.get(); + ReusableGzipOutputStream gz = poolHolder.getReusableGzipOutputStream(); gz.reset(); gz.writeHeader(); @@ -78,7 +61,7 @@ public ByteBuffer[] toUDPBuffers(ByteBuffer buffer, ByteBuffer tempBuffer) { buffer.clear(); return sliceDatagrams((ByteBuffer) tempBuffer.flip(), diagrams_length, buffer); } else { - ByteBuffer[] byteBuffers = SINGLE_BUFFER.get(); + ByteBuffer[] byteBuffers = poolHolder.getSingleBuffer(); byteBuffers[0] = (ByteBuffer) tempBuffer.flip(); return byteBuffers; } @@ -92,7 +75,7 @@ private void gzip(ByteBuffer source, ReusableGzipOutputStream gz) throws IOExcep int read = 0; source.position(0); - byte[] bytes = BYTE_ARRAY_POOL.get(); + byte[] bytes = poolHolder.getByteArray(); while (size > read) { if ((size - read) > bytes.length) { diff --git a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java index 578f3890a..24f5636cc 100644 --- a/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java +++ b/src/main/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageBuilder.java @@ -1,5 +1,6 @@ package biz.paluch.logging.gelf.intern; +import biz.paluch.logging.RuntimeContainerProperties; import biz.paluch.logging.gelf.GelfMessageBuilder; /** @@ -7,7 +8,24 @@ */ public class PoolingGelfMessageBuilder extends GelfMessageBuilder { - private PoolingGelfMessageBuilder() { + /** + * Can be + *
    + *
  • {@literal static} (default value) for static held pools
  • + *
  • {@literal true} for using instance-based held pools
  • + *
  • {@literal false} to disable pooling
  • + *
+ */ + public static final String PROPERTY_USE_POOLING = "logstash-gelf.message.pooling"; + + private static final String USE_POOLING_VAL = RuntimeContainerProperties.getProperty(PROPERTY_USE_POOLING, "static"); + private static final boolean STATIC_POOLING = USE_POOLING_VAL.equalsIgnoreCase("static"); + private static final PoolHolder STATIC_POOL_HOLDER = STATIC_POOLING ? PoolHolder.threadLocal() : PoolHolder.noop(); + + private final PoolHolder poolHolder; + + private PoolingGelfMessageBuilder(PoolHolder poolHolder) { + this.poolHolder = poolHolder; } /** @@ -16,7 +34,14 @@ private PoolingGelfMessageBuilder() { * @return GelfMessageBuilder */ public static PoolingGelfMessageBuilder newInstance() { - return new PoolingGelfMessageBuilder(); + return new PoolingGelfMessageBuilder(STATIC_POOLING ? STATIC_POOL_HOLDER : PoolHolder.threadLocal()); + } + + /** + * @return {@literal true} if pooling (static/instance-held pools) is enabled. + */ + public static boolean usePooling() { + return STATIC_POOLING || USE_POOLING_VAL.equalsIgnoreCase("true"); } /** @@ -48,7 +73,7 @@ public GelfMessageBuilder recycle() { */ public GelfMessage build() { - GelfMessage gelfMessage = new PoolingGelfMessage(shortMessage, fullMessage, javaTimestamp, level); + GelfMessage gelfMessage = new PoolingGelfMessage(shortMessage, fullMessage, javaTimestamp, level, poolHolder); gelfMessage.addFields(additionalFields); gelfMessage.setMaximumMessageSize(maximumMessageSize); gelfMessage.setVersion(version); diff --git a/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java b/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java index 569081003..314f3edf1 100644 --- a/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java +++ b/src/test/java/biz/paluch/logging/gelf/intern/PoolingGelfMessageIntegrationTests.java @@ -35,10 +35,9 @@ public class PoolingGelfMessageIntegrationTests { put("doubleWithDecimals", "2.1"); put("int", "2"); put("exception1", StackTraceFilter.getFilteredStackTrace(new IOException(new Exception(new Exception())))); - put("exception2", - StackTraceFilter.getFilteredStackTrace(new IllegalStateException(new Exception(new Exception())))); - put("exception3", StackTraceFilter - .getFilteredStackTrace(new IllegalArgumentException(new Exception(new IllegalArgumentException())))); + put("exception2", StackTraceFilter.getFilteredStackTrace(new IllegalStateException(new Exception(new Exception())))); + put("exception3", StackTraceFilter.getFilteredStackTrace(new IllegalArgumentException(new Exception( + new IllegalArgumentException())))); put("exception4", StackTraceFilter.getFilteredStackTrace(new Exception(new Exception(new Exception())))); put("exception5", StackTraceFilter.getFilteredStackTrace(new Exception(new Exception(new ConnectException())))); } @@ -137,7 +136,7 @@ public int getCurrentMillis() { private PoolingGelfMessage createPooledGelfMessage() { - PoolingGelfMessage gelfMessage = new PoolingGelfMessage() { + PoolingGelfMessage gelfMessage = new PoolingGelfMessage(PoolHolder.threadLocal()) { @Override public int getCurrentMillis() { return 1000;