Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix direct memory leak #421

Merged
merged 7 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.prometheus.PrometheusRenameFilter;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http.HttpMethod;
import io.prometheus.client.exporter.MetricsServlet;
import java.io.File;
Expand Down Expand Up @@ -78,6 +85,7 @@
import org.carapaceproxy.configstore.HerdDBConfigurationStore;
import org.carapaceproxy.configstore.PropertiesConfigurationStore;
import org.carapaceproxy.server.backends.BackendHealthManager;
import org.carapaceproxy.server.cache.CacheByteBufMemoryUsageMetric;
import org.carapaceproxy.server.cache.ContentsCache;
import org.carapaceproxy.server.certificates.DynamicCertificatesManager;
import org.carapaceproxy.server.certificates.ocsp.OcspStaplingManager;
Expand Down Expand Up @@ -178,6 +186,12 @@ public class HttpProxyServer implements AutoCloseable {
private List<RequestFilter> filters;
private volatile boolean started;

@Getter
private ByteBufAllocator cachePoolAllocator;
private CacheByteBufMemoryUsageMetric cacheByteBufMemoryUsageMetric;
@Getter
private EventLoopGroup eventLoopGroup;

/**
* Guards concurrent configuration changes
*/
Expand All @@ -194,7 +208,8 @@ public class HttpProxyServer implements AutoCloseable {
private int adminServerHttpsPort = -1;
private String adminServerCertFile;
private String adminServerCertFilePwd = "";

@Getter
private boolean usePooledByteBufAllocator;
@Getter
private String metricsUrl;
private String userRealmClassname;
Expand Down Expand Up @@ -241,6 +256,14 @@ public HttpProxyServer(EndpointMapper mapper, File basePath) throws Exception {
mapper.setParent(this);
this.proxyRequestsManager.reloadConfiguration(currentConfiguration, mapper.getBackends().values());
}

this.usePooledByteBufAllocator = Boolean.getBoolean("cache.allocator.usepooledbytebufallocator");
this.cachePoolAllocator = usePooledByteBufAllocator ?
new PooledByteBufAllocator(true): new UnpooledByteBufAllocator(true);
this.cacheByteBufMemoryUsageMetric = new CacheByteBufMemoryUsageMetric(this);
//Best practice is to reuse EventLoopGroup
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#25.0
this.eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
}

public int getLocalPort() {
Expand Down Expand Up @@ -411,6 +434,7 @@ public void start() throws InterruptedException, ConfigurationNotValidException
dynamicCertificatesManager.attachGroupMembershipHandler(groupMembershipHandler);
dynamicCertificatesManager.start();
ocspStaplingManager.start();
cacheByteBufMemoryUsageMetric.start();
groupMembershipHandler.watchEvent("configurationChange", new ConfigurationChangeCallback());
} catch (RuntimeException err) {
close();
Expand All @@ -433,6 +457,7 @@ public void close() {
backendHealthManager.stop();
dynamicCertificatesManager.stop();
ocspStaplingManager.stop();
cacheByteBufMemoryUsageMetric.stop();

if (adminserver != null) {
try {
Expand Down Expand Up @@ -550,7 +575,6 @@ private void applyStaticConfiguration(ConfigurationStore properties) throws Numb
adminAccessLogPath = properties.getString("admin.accesslog.path", adminAccessLogPath);
adminAccessLogTimezone = properties.getString("admin.accesslog.format.timezone", adminAccessLogTimezone);
adminLogRetentionDays = properties.getInt("admin.accesslog.retention.days", adminLogRetentionDays);

userRealmClassname = properties.getClassname("userrealm.class", SimpleUserRealm.class.getName());

LOG.log(Level.INFO, "http.admin.enabled={0}", adminServerEnabled);
Expand All @@ -561,6 +585,7 @@ private void applyStaticConfiguration(ConfigurationStore properties) throws Numb
LOG.log(Level.INFO, "admin.advertised.host={0}", adminAdvertisedServerHost);
LOG.log(Level.INFO, "listener.offset.port={0}", listenersOffsetPort);
LOG.log(Level.INFO, "userrealm.class={0}", userRealmClassname);
LOG.log(Level.INFO, "cache.allocator.usepooledbytebufallocator={0}", this.usePooledByteBufAllocator);

String awsAccessKey = properties.getString("aws.accesskey", null);
LOG.log(Level.INFO, "aws.accesskey={0}", awsAccessKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void bootListener(NetworkListenerConfiguration config) throws Interrupte
.metrics(true, Function.identity())
.option(ChannelOption.SO_BACKLOG, config.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, config.isKeepAlive())
.runOn(Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup())
.runOn(parent.getEventLoopGroup())
.childOption(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPIDLE
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), config.getKeepAliveIdle())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.http.*;
import io.prometheus.client.Counter;
Expand Down Expand Up @@ -344,49 +342,47 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
CarapaceLogger.debug("Max connections for {0}: {1}", connectionConfig.getId(), connectionProvider.maxConnectionsPerHost());
}

HttpClient forwarder = forwardersPool.computeIfAbsent(key.getHostPort() + "_" + connectionConfig.getId(), hostname -> {
return HttpClient.create(connectionProvider)
.host(endpointHost)
.port(endpointPort)
.followRedirect(false) // client has to request the redirect, not the proxy
.runOn(Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup())
.compress(parent.getCurrentConfiguration().isRequestCompressionEnabled())
.responseTimeout(Duration.ofMillis(connectionConfig.getStuckRequestTimeout()))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionConfig.getConnectTimeout())
.option(ChannelOption.SO_KEEPALIVE, connectionConfig.isKeepAlive()) // Enables TCP keepalive: TCP starts sending keepalive probes when a connection is idle for some time.
.option(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPIDLE
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), connectionConfig.getKeepaliveIdle())
.option(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPINTVL
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), connectionConfig.getKeepaliveInterval())
.option(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPCNT
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), connectionConfig.getKeepaliveCount())
.httpResponseDecoder(option -> option.maxHeaderSize(parent.getCurrentConfiguration().getMaxHeaderSize()))
.doOnRequest((req, conn) -> {
if (CarapaceLogger.isLoggingDebugEnabled()) {
CarapaceLogger.debug("Start sending request for "
+ " Using client id " + key.getHostPort() + "_" + connectionConfig.getId()
+ " Uri " + req.resourceUrl()
+ " Timestamp " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss.SSS"))
+ " Backend " + endpointHost + ":" + endpointPort);
}
endpointStats.getTotalRequests().incrementAndGet();
endpointStats.getLastActivity().set(System.currentTimeMillis());
}).doAfterRequest((req, conn) -> {
if (CarapaceLogger.isLoggingDebugEnabled()) {
CarapaceLogger.debug("Finished sending request for "
+ " Using client id " + key.getHostPort() + "_" + connectionConfig.getId()
+ " Uri " + request.getUri()
+ " Timestamp " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss.SSS"))
+ " Backend " + endpointHost + ":" + endpointPort);
}
}).doAfterResponseSuccess((resp, conn) -> {
PENDING_REQUESTS_GAUGE.dec();
endpointStats.getLastActivity().set(System.currentTimeMillis());
});
});
HttpClient forwarder = forwardersPool.computeIfAbsent(key.getHostPort() + "_" + connectionConfig.getId(), hostname -> HttpClient.create(connectionProvider)
.host(endpointHost)
.port(endpointPort)
.followRedirect(false) // client has to request the redirect, not the proxy
.runOn(parent.getEventLoopGroup())
.compress(parent.getCurrentConfiguration().isRequestCompressionEnabled())
.responseTimeout(Duration.ofMillis(connectionConfig.getStuckRequestTimeout()))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionConfig.getConnectTimeout())
.option(ChannelOption.SO_KEEPALIVE, connectionConfig.isKeepAlive()) // Enables TCP keepalive: TCP starts sending keepalive probes when a connection is idle for some time.
.option(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPIDLE
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPIDLE), connectionConfig.getKeepaliveIdle())
.option(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPINTVL
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPINTERVAL), connectionConfig.getKeepaliveInterval())
.option(Epoll.isAvailable()
? EpollChannelOption.TCP_KEEPCNT
: NioChannelOption.of(ExtendedSocketOptions.TCP_KEEPCOUNT), connectionConfig.getKeepaliveCount())
.httpResponseDecoder(option -> option.maxHeaderSize(parent.getCurrentConfiguration().getMaxHeaderSize()))
.doOnRequest((req, conn) -> {
if (CarapaceLogger.isLoggingDebugEnabled()) {
CarapaceLogger.debug("Start sending request for "
+ " Using client id " + key.getHostPort() + "_" + connectionConfig.getId()
+ " Uri " + req.resourceUrl()
+ " Timestamp " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss.SSS"))
+ " Backend " + endpointHost + ":" + endpointPort);
}
endpointStats.getTotalRequests().incrementAndGet();
endpointStats.getLastActivity().set(System.currentTimeMillis());
}).doAfterRequest((req, conn) -> {
if (CarapaceLogger.isLoggingDebugEnabled()) {
CarapaceLogger.debug("Finished sending request for "
+ " Using client id " + key.getHostPort() + "_" + connectionConfig.getId()
+ " Uri " + request.getUri()
+ " Timestamp " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss.SSS"))
+ " Backend " + endpointHost + ":" + endpointPort);
}
}).doAfterResponseSuccess((resp, conn) -> {
PENDING_REQUESTS_GAUGE.dec();
endpointStats.getLastActivity().set(System.currentTimeMillis());
}));

AtomicBoolean cacheable = new AtomicBoolean(cache);
final ContentsCache.ContentReceiver cacheReceiver = cacheable.get() ? parent.getCache().createCacheReceiver(request) : null;
Expand Down Expand Up @@ -429,7 +425,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
request.setLastActivity(System.currentTimeMillis());
endpointStats.getLastActivity().set(System.currentTimeMillis());
if (cacheable.get()) {
cacheReceiver.receivedFromRemote(data);
cacheReceiver.receivedFromRemote(data, parent.getCachePoolAllocator());
}
}).doOnSuccess(data -> {
if (cacheable.get()) {
Expand All @@ -442,7 +438,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
request.setLastActivity(System.currentTimeMillis());
endpointStats.getLastActivity().set(System.currentTimeMillis());
if (cacheable.get()) {
cacheReceiver.receivedFromRemote(data);
cacheReceiver.receivedFromRemote(data, parent.getCachePoolAllocator());
}
}).doOnComplete(() -> {
if (CarapaceLogger.isLoggingDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.carapaceproxy.server.cache;

import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.prometheus.client.Gauge;
import org.carapaceproxy.core.HttpProxyServer;
import org.carapaceproxy.utils.PrometheusUtils;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class CacheByteBufMemoryUsageMetric implements Runnable {

public static final int DEFAULT_PERIOD = 5; // seconds
private static final Logger LOG = Logger.getLogger(CacheByteBufMemoryUsageMetric.class.getName());
private static final Gauge CACHE_POOLED_BYTEBUF_ALLOCATOR = PrometheusUtils.createGauge("cacheAllocator",
"cache_pooled_allocator_direct_memory_usage",
"Amount of direct memory usage by cache allocator").register();
private static final Gauge CACHE_UNPOOLED_BYTEBUF_ALLOCATOR = PrometheusUtils.createGauge("cacheAllocator",
"cache_unpooled_allocator_direct_memory_usage",
"Amount of direct memory usage by cache allocator").register();
private ScheduledExecutorService timer;
private ScheduledFuture<?> scheduledFuture;
private HttpProxyServer parent;

// can change at runtime
private volatile int period;
private volatile boolean started; // keep track of start() calling

public CacheByteBufMemoryUsageMetric(HttpProxyServer parent) {
this.period = DEFAULT_PERIOD;
this.parent = parent;
}


public int getPeriod() {
return period;
}

public void setPeriod(int period) {
this.period = period;
}

public synchronized void start() {
started = true;
if (period <= 0) {
return;
}
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}
LOG.info("Starting cache ByteBufAllocator usage, period: " + period + " seconds");
scheduledFuture = timer.scheduleAtFixedRate(this, period, period, TimeUnit.SECONDS);
}

public synchronized void stop() {
started = false;
if (timer != null) {
timer.shutdown();
try {
timer.awaitTermination(10, TimeUnit.SECONDS);
timer = null;
scheduledFuture = null;
} catch (InterruptedException err) {
Thread.currentThread().interrupt();
}
}
}


@Override
public void run() {
if(parent.getCachePoolAllocator() instanceof PooledByteBufAllocator) {
CACHE_POOLED_BYTEBUF_ALLOCATOR.set(((PooledByteBufAllocator) parent.getCachePoolAllocator()).metric().usedDirectMemory());
} else {
CACHE_UNPOOLED_BYTEBUF_ALLOCATOR.set(((UnpooledByteBufAllocator) parent.getCachePoolAllocator()).metric().usedDirectMemory());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.RemovalCause;
import static com.github.benmanes.caffeine.cache.RemovalCause.COLLECTED;
import static com.github.benmanes.caffeine.cache.RemovalCause.EXPIRED;
import static com.github.benmanes.caffeine.cache.RemovalCause.EXPLICIT;
import static com.github.benmanes.caffeine.cache.RemovalCause.REPLACED;
import static com.github.benmanes.caffeine.cache.RemovalCause.SIZE;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
Expand Down Expand Up @@ -299,19 +302,23 @@ public static class CachedContent {
long directSize;
int hits;

private void addChunk(ByteBuf chunk) {
chunks.add(chunk.retainedDuplicate());
if (chunk.isDirect()) {
directSize += chunk.capacity();
private synchronized void addChunk(ByteBuf chunk, ByteBufAllocator allocator) {
ByteBuf originalChunk = chunk.retainedDuplicate();
ByteBuf directBuffer = allocator.directBuffer(originalChunk.readableBytes());
directBuffer.writeBytes(originalChunk);
chunks.add(directBuffer);
if (directBuffer.isDirect()) {
directSize += directBuffer.capacity();
} else {
heapSize += chunk.capacity();
heapSize += directBuffer.capacity();
}
originalChunk.release();
}

void clear() {
synchronized void clear() {
chunks.forEach(ByteBuf::release);
hamadodene marked this conversation as resolved.
Show resolved Hide resolved
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "ConcentsCache refCnt after release");
LOG.log(Level.FINE, "ContentsCache refCnt after release");
chunks.forEach(buff -> LOG.log(Level.FINE, "refCnt: {0}", buff.refCnt()));
}
chunks.clear();
Expand Down Expand Up @@ -523,13 +530,13 @@ public boolean receivedFromRemote(HttpClientResponse response) {
return true;
}

public void receivedFromRemote(ByteBuf chunk) {
public void receivedFromRemote(ByteBuf chunk, ByteBufAllocator allocator) {
if (notReallyCacheable) {
LOG.log(Level.FINEST, "{0} rejecting non-cacheable response", key);
abort();
return;
}
content.addChunk(chunk);
content.addChunk(chunk, allocator);
}
}

Expand Down