Skip to content
This repository has been archived by the owner on Jun 29, 2023. It is now read-only.

Commit

Permalink
Introduce PoolHolder for ThreadLocal-pooled objects #104
Browse files Browse the repository at this point in the history
logstash-gelf now uses a PoolHolder instead of static final ThreadLocal instances to hold references to ThreadLocal's and the associated objects. Using an instance to hold ThreadLocal instances breaks the cycle so objects can be collected by the GC if the ClassLoader is no longer in use (i.e. because of a hot-redeploy without restarting the JVM).

The object root and the PoolHolder are referenced from PoolingGelfMessageBuilder which is created per log appender/handler instance or as static instance (default setting). Holding pools per instance can increase memory usage. Pooling mode can be controlled via logstash-gelf.message.pooling (Environment variable, System property).
  • Loading branch information
mp911de committed Mar 15, 2017
1 parent 0372e4e commit 499df59
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 76 deletions.
39 changes: 22 additions & 17 deletions src/main/java/biz/paluch/logging/gelf/GelfMessageAssembler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.*;

import biz.paluch.logging.RuntimeContainer;
import biz.paluch.logging.RuntimeContainerProperties;
import biz.paluch.logging.StackTraceFilter;
import biz.paluch.logging.gelf.intern.GelfMessage;
import biz.paluch.logging.gelf.intern.HostAndPortProvider;
Expand All @@ -23,9 +22,11 @@
*/
public class GelfMessageAssembler implements HostAndPortProvider {

/**
* @deprecated see {@link PoolingGelfMessageBuilder#PROPERTY_USE_POOLING}.
*/
@Deprecated
public static final String PROPERTY_USE_POOLING = "logstash-gelf.message.pooling";
private static final boolean USE_POOLING = Boolean
.valueOf(RuntimeContainerProperties.getProperty(PROPERTY_USE_POOLING, "true"));

private static final int MAX_SHORT_MESSAGE_LENGTH = 250;
private static final int MAX_PORT_NUMBER = 65535;
Expand All @@ -48,17 +49,22 @@ public class GelfMessageAssembler implements HostAndPortProvider {

private String timestampPattern = "yyyy-MM-dd HH:mm:ss,SSSS";

private boolean usePooling = USE_POOLING;
private final ThreadLocal<PoolingGelfMessageBuilder> builders;

private ThreadLocal<PoolingGelfMessageBuilder> builders = new ThreadLocal<PoolingGelfMessageBuilder>() {
public GelfMessageAssembler() {

@Override
protected PoolingGelfMessageBuilder initialValue() {
return PoolingGelfMessageBuilder.newInstance();
}
};
if (PoolingGelfMessageBuilder.usePooling()) {

public GelfMessageAssembler() {
builders = new ThreadLocal<PoolingGelfMessageBuilder>() {

@Override
protected PoolingGelfMessageBuilder initialValue() {
return PoolingGelfMessageBuilder.newInstance();
}
};
} else {
builders = null;
}
}

/**
Expand All @@ -84,8 +90,7 @@ public void initialize(PropertyProvider propertyProvider) {

originHost = propertyProvider.getProperty(PropertyProvider.PROPERTY_ORIGIN_HOST);
setExtractStackTrace(propertyProvider.getProperty(PropertyProvider.PROPERTY_EXTRACT_STACKTRACE));
setFilterStackTrace(
"true".equalsIgnoreCase(propertyProvider.getProperty(PropertyProvider.PROPERTY_FILTER_STACK_TRACE)));
setFilterStackTrace("true".equalsIgnoreCase(propertyProvider.getProperty(PropertyProvider.PROPERTY_FILTER_STACK_TRACE)));

String includeLogMessageParameters = propertyProvider
.getProperty(PropertyProvider.PROPERTY_INCLUDE_LOG_MESSAGE_PARAMETERS);
Expand Down Expand Up @@ -116,7 +121,7 @@ public void initialize(PropertyProvider propertyProvider) {
*/
public GelfMessage createGelfMessage(LogEvent logEvent) {

GelfMessageBuilder builder = usePooling ? builders.get().recycle() : newInstance();
GelfMessageBuilder builder = builders != null ? builders.get().recycle() : newInstance();

Throwable throwable = logEvent.getThrowable();
String message = logEvent.getMessage();
Expand Down Expand Up @@ -352,8 +357,8 @@ public int getMaximumMessageSize() {
public void setMaximumMessageSize(int maximumMessageSize) {

if (maximumMessageSize > MAX_MESSAGE_SIZE || maximumMessageSize < 1) {
throw new IllegalArgumentException(
"Invalid maximum message size: " + maximumMessageSize + ", supported range: 1-" + MAX_MESSAGE_SIZE);
throw new IllegalArgumentException("Invalid maximum message size: " + maximumMessageSize + ", supported range: 1-"
+ MAX_MESSAGE_SIZE);
}

this.maximumMessageSize = maximumMessageSize;
Expand Down Expand Up @@ -390,7 +395,7 @@ private StackTraceExtraction(boolean enabled, boolean filter, int ref) {

/**
* Parse the stack trace filtering value.
*
*
* @param value
* @return
*/
Expand Down
80 changes: 59 additions & 21 deletions src/main/java/biz/paluch/logging/gelf/intern/OutputAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,6 @@
*/
abstract class OutputAccessor {

private static final ThreadLocal<ByteBufferOutputAccessor> accessors = new ThreadLocal<ByteBufferOutputAccessor>() {
@Override
protected ByteBufferOutputAccessor initialValue() {
return new ByteBufferOutputAccessor();
}
};

private static final ThreadLocal<ByteBufferOutputStream> streams = new ThreadLocal<ByteBufferOutputStream>() {
@Override
protected ByteBufferOutputStream initialValue() {
return new ByteBufferOutputStream(null);
}
};

public abstract void write(int b);

public abstract void write(byte[] b);
Expand All @@ -31,7 +17,7 @@ protected ByteBufferOutputStream initialValue() {

/**
* Create an {@link OutputAccessor} for the given {@link OutputStream}.
*
*
* @param outputStream
* @return
*/
Expand All @@ -47,7 +33,22 @@ public static OutputAccessor from(OutputStream outputStream) {
*/
public static OutputAccessor from(ByteBuffer byteBuffer) {

ByteBufferOutputAccessor accessor = accessors.get();
ByteBufferOutputAccessor accessor = new ByteBufferOutputAccessor();
accessor.byteBuffer = byteBuffer;

return accessor;
}

/**
* Create an {@link OutputAccessor} for the given {@link ByteBuffer}. Instances are pooled within the thread scope.
*
* @param poolHolder
* @param byteBuffer
* @return
*/
public static OutputAccessor from(OutputAccessorPoolHolder poolHolder, ByteBuffer byteBuffer) {

ByteBufferOutputAccessor accessor = poolHolder.getByteBufferOutputAccessor();
accessor.byteBuffer = byteBuffer;

return accessor;
Expand All @@ -56,21 +57,24 @@ public static OutputAccessor from(ByteBuffer byteBuffer) {
/**
* Retrieve a pooled {@link OutputStream}.
*
* @param poolHolder
* @return
*/
public static OutputStream pooledStream() {
return streams.get();
public static OutputStream pooledStream(OutputAccessorPoolHolder poolHolder) {
return poolHolder.getByteBufferOutputStream();
}

/**
* Retrieved a pooled an {@link OutputStream} for the given {@link ByteBuffer}. Instances are pooled within the thread scope.
* Retrieved a pooled an {@link OutputStream} for the given {@link ByteBuffer}. Instances are pooled within the thread
* scope.
*
* @param poolHolder
* @param byteBuffer
* @return
*/
public static OutputStream asStream(ByteBuffer byteBuffer) {
public static OutputStream asStream(OutputAccessorPoolHolder poolHolder, ByteBuffer byteBuffer) {

ByteBufferOutputStream accessor = streams.get();
ByteBufferOutputStream accessor = poolHolder.getByteBufferOutputStream();
accessor.byteBuffer = byteBuffer;

return accessor;
Expand Down Expand Up @@ -152,4 +156,38 @@ public void write(int b) {
}
}
}

/**
* Holder for {@link ThreadLocal} pools.
*/
static class OutputAccessorPoolHolder {

private final ThreadLocal<ByteBufferOutputAccessor> accessorPool = new ThreadLocal<ByteBufferOutputAccessor>() {
@Override
protected ByteBufferOutputAccessor initialValue() {
return new ByteBufferOutputAccessor();
}
};

private final ThreadLocal<ByteBufferOutputStream> streamPool = new ThreadLocal<ByteBufferOutputStream>() {
@Override
protected ByteBufferOutputStream initialValue() {
return new ByteBufferOutputStream(null);
}
};

/**
* @return a pooled {@link ByteBufferOutputAccessor} instance.
*/
public ByteBufferOutputAccessor getByteBufferOutputAccessor() {
return accessorPool.get();
}

/**
* @return a pooled {@link ByteBufferOutputStream} instance.
*/
public ByteBufferOutputStream getByteBufferOutputStream() {
return streamPool.get();
}
}
}
141 changes: 141 additions & 0 deletions src/main/java/biz/paluch/logging/gelf/intern/PoolHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package biz.paluch.logging.gelf.intern;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Holder for {@link ThreadLocal} pools.
*
* @author Mark Paluch
*/
abstract class PoolHolder {

public static PoolHolder noop() {
return NoOpPoolHolder.noop();
}

public static PoolHolder threadLocal() {
return new ThreadLocalPoolHolder();
}

/**
* @return the {@link OutputAccessor.OutputAccessorPoolHolder}.
*/
public abstract OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder();

/**
* @return a pooled {@link ReusableGzipOutputStream} instance.
*/
public abstract ReusableGzipOutputStream getReusableGzipOutputStream();

/**
* @return a pooled {@link ByteBuffer}-array instance.
*/
public abstract ByteBuffer[] getSingleBuffer();

/**
* @return a pooled byte-array instance.
*/
public abstract byte[] getByteArray();

private static class NoOpPoolHolder extends PoolHolder {

private final static NoOpPoolHolder instance = new NoOpPoolHolder();

public static PoolHolder noop() {
return instance;
}

@Override
public OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder() {
throw new UnsupportedOperationException();
}

@Override
public ReusableGzipOutputStream getReusableGzipOutputStream() {
throw new UnsupportedOperationException();
}

@Override
public ByteBuffer[] getSingleBuffer() {
throw new UnsupportedOperationException();
}

@Override
public byte[] getByteArray() {
throw new UnsupportedOperationException();
}
}

/**
* {@link PoolHolder} backed by {@link ThreadLocal}.
*/
private static class ThreadLocalPoolHolder extends PoolHolder {

private final OutputAccessor.OutputAccessorPoolHolder outputAccessorPoolHolder;

private final ThreadLocal<ReusableGzipOutputStream> streamPool = new ThreadLocal<ReusableGzipOutputStream>() {
@Override
protected ReusableGzipOutputStream initialValue() {
try {
return new ReusableGzipOutputStream(OutputAccessor.pooledStream(outputAccessorPoolHolder));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
};

private final ThreadLocal<ByteBuffer[]> singleBufferPool = new ThreadLocal<ByteBuffer[]>() {
@Override
protected ByteBuffer[] initialValue() {
return new ByteBuffer[1];
}
};

private final ThreadLocal<byte[]> byteArrayPool = new ThreadLocal<byte[]>() {
@Override
protected byte[] initialValue() {
return new byte[8192 * 2];
}
};

/**
* Create a new {@link PoolHolder} instance.
*/
public ThreadLocalPoolHolder() {
this(new OutputAccessor.OutputAccessorPoolHolder());
}

private ThreadLocalPoolHolder(OutputAccessor.OutputAccessorPoolHolder outputAccessorPoolHolder) {
this.outputAccessorPoolHolder = outputAccessorPoolHolder;
}

/**
* @return the {@link OutputAccessor.OutputAccessorPoolHolder}.
*/
public OutputAccessor.OutputAccessorPoolHolder getOutputAccessorPoolHolder() {
return outputAccessorPoolHolder;
}

/**
* @return a pooled {@link ReusableGzipOutputStream} instance.
*/
public ReusableGzipOutputStream getReusableGzipOutputStream() {
return streamPool.get();
}

/**
* @return a pooled {@link ByteBuffer}-array instance.
*/
public ByteBuffer[] getSingleBuffer() {
return singleBufferPool.get();
}

/**
* @return a pooled byte-array instance.
*/
public byte[] getByteArray() {
return byteArrayPool.get();
}
}
}
Loading

0 comments on commit 499df59

Please sign in to comment.