Skip to content

Commit

Permalink
Added cache trim interval scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
hamadodene committed Apr 28, 2023
1 parent 51975db commit 4baae49
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.carapaceproxy.configstore.HerdDBConfigurationStore;
import org.carapaceproxy.configstore.PropertiesConfigurationStore;
import org.carapaceproxy.server.backends.BackendHealthManager;
import org.carapaceproxy.server.cache.CachePoolTrimmer;
import org.carapaceproxy.server.cache.CachePooledByteBufDirectUsage;
import org.carapaceproxy.server.cache.ContentsCache;
import org.carapaceproxy.server.certificates.DynamicCertificatesManager;
Expand Down Expand Up @@ -181,8 +182,9 @@ public class HttpProxyServer implements AutoCloseable {
private volatile boolean started;

@Getter
private PooledByteBufAllocator cacheAllocator;
private ThreadLocal<PooledByteBufAllocator> cacheThreadLocalAllocator;
private CachePooledByteBufDirectUsage cachePooledByteBufDirectUsage;
private CachePoolTrimmer cachePoolTrimmer;

/**
* Guards concurrent configuration changes
Expand Down Expand Up @@ -247,8 +249,10 @@ public HttpProxyServer(EndpointMapper mapper, File basePath) throws Exception {
mapper.setParent(this);
this.proxyRequestsManager.reloadConfiguration(currentConfiguration, mapper.getBackends().values());
}
this.cacheAllocator = new PooledByteBufAllocator(true);

this.cacheThreadLocalAllocator = ThreadLocal.withInitial(() -> new PooledByteBufAllocator(true));
this.cachePooledByteBufDirectUsage = new CachePooledByteBufDirectUsage(this);
this.cachePoolTrimmer = new CachePoolTrimmer(this);
}

public int getLocalPort() {
Expand Down Expand Up @@ -420,6 +424,7 @@ public void start() throws InterruptedException, ConfigurationNotValidException
dynamicCertificatesManager.start();
ocspStaplingManager.start();
cachePooledByteBufDirectUsage.start();
cachePoolTrimmer.start();
groupMembershipHandler.watchEvent("configurationChange", new ConfigurationChangeCallback());
} catch (RuntimeException err) {
close();
Expand All @@ -443,6 +448,7 @@ public void close() {
dynamicCertificatesManager.stop();
ocspStaplingManager.stop();
cachePooledByteBufDirectUsage.stop();
cachePoolTrimmer.stop();

if (adminserver != null) {
try {
Expand Down Expand Up @@ -655,6 +661,14 @@ public void updateDynamicCertificateForDomain(CertificateData cert) throws Excep
applyDynamicConfigurationFromAPI(new PropertiesConfigurationStore(props));
}

public boolean trimCachePool() {
PooledByteBufAllocator allocator = cacheThreadLocalAllocator.get();
if (allocator != null) {
return allocator.trimCurrentThreadCache();
}
return false;
}

public void UpdateMaintenanceMode(boolean value) throws ConfigurationChangeInProgressException, InterruptedException {
Properties props = dynamicConfigurationStore.asProperties(null);
props.setProperty("carapace.maintenancemode.enabled", String.valueOf(value));
Expand Down Expand Up @@ -733,6 +747,7 @@ private void applyDynamicConfiguration(ConfigurationStore newConfigurationStore,
this.listeners.reloadConfiguration(newConfiguration);
this.cache.reloadConfiguration(newConfiguration);
this.requestsLogger.reloadConfiguration(newConfiguration);
this.cachePoolTrimmer.reloadConfiguration(newConfiguration);
this.realm = newRealm;
Map<String, BackendConfiguration> currentBackends = mapper != null ? mapper.getBackends() : Collections.emptyMap();
Map<String, BackendConfiguration> newBackends = newMapper.getBackends();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
request.setLastActivity(System.currentTimeMillis());
endpointStats.getLastActivity().set(System.currentTimeMillis());
if (cacheable.get()) {
cacheReceiver.receivedFromRemote(data, parent.getCacheAllocator());
cacheReceiver.receivedFromRemote(data, parent.getCacheThreadLocalAllocator().get());
}
}).doOnSuccess(data -> {
if (cacheable.get()) {
Expand All @@ -442,7 +442,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
request.setLastActivity(System.currentTimeMillis());
endpointStats.getLastActivity().set(System.currentTimeMillis());
if (cacheable.get()) {
cacheReceiver.receivedFromRemote(data, parent.getCacheAllocator());
cacheReceiver.receivedFromRemote(data, parent.getCacheThreadLocalAllocator().get());
}
}).doOnComplete(() -> {
if (CarapaceLogger.isLoggingDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class RuntimeServerConfiguration {
private int accessLogAdvancedBodySize = 1_000; // bytes
private String userRealmClassname;
private int healthProbePeriod = 0;
private int cachePoolTrimInterval = 3600;
private int dynamicCertificatesManagerPeriod = 0;
private int keyPairsSize = DEFAULT_KEYPAIRS_SIZE;
private Set<String> domainsCheckerIPAddresses;
Expand Down Expand Up @@ -202,6 +203,12 @@ public void configure(ConfigurationStore properties) throws ConfigurationNotVali
LOG.warning("BACKEND-HEALTH-MANAGER DISABLED");
}

cachePoolTrimInterval = properties.getInt("cache.pool.triminterval", 3600);
LOG.log(Level.INFO, "cache.pool.triminterval={0}", cachePoolTrimInterval);
if (cachePoolTrimInterval <= 0) {
LOG.warning("CACHE TRIM DISABLED");
}

dynamicCertificatesManagerPeriod = properties.getInt("dynamiccertificatesmanager.period", 0);
LOG.log(Level.INFO, "dynamiccertificatesmanager.period={0}", dynamicCertificatesManagerPeriod);
keyPairsSize = properties.getInt("dynamiccertificatesmanager.keypairssize", DEFAULT_KEYPAIRS_SIZE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.carapaceproxy.server.cache;

import org.carapaceproxy.core.HttpProxyServer;
import org.carapaceproxy.core.RuntimeServerConfiguration;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CachePoolTrimmer implements Runnable {

public static final int DEFAULT_CACHE_POOL_TRIM_INTERVAL = 3600; // seconds
private static final Logger LOG = Logger.getLogger(CachePoolTrimmer.class.getName());

private ScheduledExecutorService timer;
private ScheduledFuture<?> scheduledFuture;
private HttpProxyServer parent;

// can change at runtime
private volatile int period;
private volatile boolean started; // keep track of start() calling

public CachePoolTrimmer(HttpProxyServer parent) {
this.period = DEFAULT_CACHE_POOL_TRIM_INTERVAL;
this.parent = parent;
}

public int getPeriod() {
return period;
}

public void setPeriod(int period) {
this.period = period;
}

public synchronized void start() {
started = true;
if (period <= 0) {
return;
}
if (timer == null) {
timer = Executors.newSingleThreadScheduledExecutor();
}
LOG.info("Starting cache trim scheduler, period: " + period + " seconds");
scheduledFuture = timer.scheduleAtFixedRate(this, period, period, TimeUnit.SECONDS);
}

public synchronized void stop() {
started = false;
if (timer != null) {
timer.shutdown();
try {
timer.awaitTermination(10, TimeUnit.SECONDS);
timer = null;
scheduledFuture = null;
} catch (InterruptedException err) {
Thread.currentThread().interrupt();
}
}
}


public synchronized void reloadConfiguration(RuntimeServerConfiguration newConfiguration) {
int newPeriod = newConfiguration.getCachePoolTrimInterval();
boolean changePeriod = period != newPeriod;
boolean restart = scheduledFuture != null && changePeriod;

if (restart) {
scheduledFuture.cancel(true);
}

if (changePeriod) {
period = newPeriod;
LOG.info("Applying new cache trim interval " + period + " s");
}

if (restart || started) {
start();
}
}

@Override
public void run() {
if (parent.trimCachePool()) {
LOG.log(Level.INFO, "Cache PooledByteBufAllocator: trim successful");
} else {
LOG.log(Level.INFO, "Cache PooledByteBufAllocator: No memory released by cache trim");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CachePooledByteBufDirectUsage implements Runnable {
Expand Down Expand Up @@ -75,8 +74,8 @@ public synchronized void stop() {
@Override
public void run() {
if(CarapaceLogger.isLoggingDebugEnabled()) {
CarapaceLogger.debug("cache allocator status: " + parent.getCacheAllocator().metric().toString());
CarapaceLogger.debug("cache allocator status: " + parent.getCacheThreadLocalAllocator().get().metric().toString());
}
CACHE_ALLOCATOR_DIRECT_MEMORY_USAGE.set(parent.getCacheAllocator().metric().usedDirectMemory());
CACHE_ALLOCATOR_DIRECT_MEMORY_USAGE.set(parent.getCacheThreadLocalAllocator().get().metric().usedDirectMemory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public static class CachedContent {
long directSize;
int hits;

private synchronized void addChunk(ByteBuf chunk, ByteBufAllocator allocator) {
private synchronized void addChunk(ByteBuf chunk, PooledByteBufAllocator allocator) {
ByteBuf originalChunk = chunk.retainedDuplicate();
ByteBuf directBuffer = allocator.directBuffer(originalChunk.readableBytes());
directBuffer.writeBytes(originalChunk);
Expand Down Expand Up @@ -531,7 +531,7 @@ public boolean receivedFromRemote(HttpClientResponse response) {
return true;
}

public void receivedFromRemote(ByteBuf chunk, ByteBufAllocator allocator) {
public void receivedFromRemote(ByteBuf chunk, PooledByteBufAllocator allocator) {
if (notReallyCacheable) {
LOG.log(Level.FINEST, "{0} rejecting non-cacheable response", key);
abort();
Expand Down

0 comments on commit 4baae49

Please sign in to comment.