Skip to content

Commit

Permalink
feat: invert HttpProxyServer dependency for mapper
Browse files Browse the repository at this point in the history
� Conflicts:
�	carapace-server/src/test/java/org/carapaceproxy/listeners/ListenerConfigurationTest.java
  • Loading branch information
NiccoMlt committed Nov 29, 2024
1 parent 7e348bb commit 4016086
Show file tree
Hide file tree
Showing 19 changed files with 249 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ public List<String> getBackends() {

@GET
public List<DirectorBean> getAll() {
final List<DirectorBean> directors = new ArrayList();
final List<DirectorBean> directors = new ArrayList<>();
HttpProxyServer server = (HttpProxyServer) context.getAttribute("server");
server.getMapper().getDirectors().forEach(director -> {
directors.add(new DirectorBean(director.getId(), director.getBackends()));
});
server.getMapper().getDirectors()
.forEach((directorId, director) -> directors.add(new DirectorBean(directorId, director.getBackends())));

return directors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public class HttpProxyServer implements AutoCloseable {
*/
private final ReentrantLock configurationLock = new ReentrantLock();

private Server adminserver;
private Server adminServer;
private String adminAccessLogPath = "admin.access.log";
private String adminAccessLogTimezone = "GMT";
private int adminLogRetentionDays = 90;
Expand All @@ -226,48 +226,52 @@ public class HttpProxyServer implements AutoCloseable {
@Getter
private int listenersOffsetPort = 0;

public static HttpProxyServer buildForTests(String host, int port, EndpointMapper mapper, File baseDir) throws ConfigurationNotValidException {
HttpProxyServer res = new HttpProxyServer(mapper, baseDir.getAbsoluteFile());
res.currentConfiguration.addListener(new NetworkListenerConfiguration(host, port));
res.proxyRequestsManager.reloadConfiguration(res.currentConfiguration, mapper.getBackends().values());

return res;
}

public HttpProxyServer(EndpointMapper mapper, File basePath) {
@VisibleForTesting
public static HttpProxyServer buildForTests(
final String host,
final int port,
final EndpointMapper.Factory mapperFactory,
final File baseDir
) throws ConfigurationNotValidException {
final HttpProxyServer server = new HttpProxyServer(mapperFactory, baseDir.getAbsoluteFile());
final EndpointMapper mapper = server.getMapper();
server.currentConfiguration.addListener(new NetworkListenerConfiguration(host, port));
server.proxyRequestsManager.reloadConfiguration(server.currentConfiguration, mapper.getBackends().values());
return server;
}

public HttpProxyServer(final EndpointMapper.Factory mapperFactory, File basePath) throws ConfigurationNotValidException {
// metrics
statsProvider = new PrometheusMetricsProvider();
mainLogger = statsProvider.getStatsLogger("");
prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Metrics.globalRegistry.add(prometheusRegistry);
this.statsProvider = new PrometheusMetricsProvider();
this.mainLogger = this.statsProvider.getStatsLogger("");
this.prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
this.prometheusRegistry.config().meterFilter(new PrometheusRenameFilter());
Metrics.globalRegistry.add(this.prometheusRegistry);
Metrics.globalRegistry.config()
.meterFilter(MeterFilter.denyNameStartsWith(("reactor.netty.http.server.data"))) // spam
.meterFilter(MeterFilter.denyNameStartsWith(("reactor.netty.http.server.response"))) // spam
.meterFilter(MeterFilter.denyNameStartsWith(("reactor.netty.http.server.errors"))); // spam

this.mapper = mapper;
this.basePath = basePath;
this.filters = new ArrayList<>();
this.currentConfiguration = new RuntimeServerConfiguration();
this.backendHealthManager = new BackendHealthManager(currentConfiguration, mapper);
this.listeners = new Listeners(this);
this.cache = new ContentsCache(currentConfiguration);
this.requestsLogger = new RequestsLogger(currentConfiguration);
this.dynamicCertificatesManager = new DynamicCertificatesManager(this);
this.trustStoreManager = new TrustStoreManager(currentConfiguration, this);
this.ocspStaplingManager = new OcspStaplingManager(trustStoreManager);
this.proxyRequestsManager = new ProxyRequestsManager(this);
if (mapper != null) {
mapper.setParent(this);
this.proxyRequestsManager.reloadConfiguration(currentConfiguration, mapper.getBackends().values());
}
this.mapper = mapperFactory.build(this);
this.backendHealthManager = new BackendHealthManager(currentConfiguration, this.mapper);
this.proxyRequestsManager.reloadConfiguration(currentConfiguration, this.mapper.getBackends().values());

this.usePooledByteBufAllocator = Boolean.getBoolean("cache.allocator.usepooledbytebufallocator");
this.cachePoolAllocator = usePooledByteBufAllocator
? new PooledByteBufAllocator(true) : new UnpooledByteBufAllocator(true);
? new PooledByteBufAllocator(true)
: new UnpooledByteBufAllocator(true);
this.cacheByteBufMemoryUsageMetric = new CacheByteBufMemoryUsageMetric(this);
//Best practice is to reuse EventLoopGroup
// Best practice is to reuse EventLoopGroup
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#25.0
this.eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
}
Expand All @@ -283,13 +287,6 @@ public int getLocalPort() {
return listeners.getLocalPort();
}

@VisibleForTesting
public void setMapper(EndpointMapper mapper) {
Objects.requireNonNull(mapper);
mapper.setParent(this);
this.mapper = mapper;
}

public void startAdminInterface() throws Exception {
if (!adminServerEnabled) {
return;
Expand All @@ -299,17 +296,17 @@ public void startAdminInterface() throws Exception {
throw new RuntimeException("To enable admin interface at least one between http and https port must be set");
}

adminserver = new Server();
adminServer = new Server();

ServerConnector httpConnector = null;
if (adminServerHttpPort >= 0) {
LOG.info("Starting Admin UI over HTTP");

httpConnector = new ServerConnector(adminserver);
httpConnector = new ServerConnector(adminServer);
httpConnector.setPort(adminServerHttpPort);
httpConnector.setHost(adminServerHost);

adminserver.addConnector(httpConnector);
adminServer.addConnector(httpConnector);
}

ServerConnector httpsConnector = null;
Expand All @@ -333,17 +330,17 @@ public void startAdminInterface() throws Exception {
https.setSecurePort(adminServerHttpsPort);
https.addCustomizer(new SecureRequestCustomizer());

httpsConnector = new ServerConnector(adminserver,
httpsConnector = new ServerConnector(adminServer,
new SslConnectionFactory(sslContextFactory, "http/1.1"),
new HttpConnectionFactory(https));
httpsConnector.setPort(adminServerHttpsPort);
httpsConnector.setHost(adminServerHost);

adminserver.addConnector(httpsConnector);
adminServer.addConnector(httpsConnector);
}

ContextHandlerCollection contexts = new ContextHandlerCollection();
adminserver.setHandler(constrainTraceMethod(contexts));
adminServer.setHandler(constrainTraceMethod(contexts));

File webUi = new File(basePath, "web/ui");
if (webUi.isDirectory()) {
Expand Down Expand Up @@ -379,7 +376,7 @@ public void startAdminInterface() throws Exception {

contexts.addHandler(requestLogHandler);

adminserver.start();
adminServer.start();

LOG.info("Admin UI started");

Expand Down Expand Up @@ -472,13 +469,13 @@ public void close() {
ocspStaplingManager.stop();
cacheByteBufMemoryUsageMetric.stop();

if (adminserver != null) {
if (adminServer != null) {
try {
adminserver.stop();
adminServer.stop();
} catch (Exception err) {
LOG.error("Error while stopping admin server", err);
} finally {
adminserver = null;
adminServer = null;
}
}
statsProvider.stop();
Expand All @@ -496,21 +493,20 @@ public void close() {
staticContentsManager.close();

if (dynamicConfigurationStore != null) {
// this will also shutdown embedded database
// this will also shut down embedded database
dynamicConfigurationStore.close();
}
}

private static EndpointMapper buildMapper(String className, ConfigurationStore properties) throws ConfigurationNotValidException {
private static EndpointMapper buildMapper(final String className, final HttpProxyServer parent, final ConfigurationStore properties) throws ConfigurationNotValidException {
try {
EndpointMapper res = (EndpointMapper) Class.forName(className).getConstructor().newInstance();
final Class<? extends EndpointMapper> mapperClass = Class.forName(className).asSubclass(EndpointMapper.class);
final EndpointMapper res = mapperClass.getConstructor(HttpProxyServer.class).newInstance(parent);
res.configure(properties);
return res;
} catch (ClassNotFoundException err) {
} catch (final ClassNotFoundException | ClassCastException | NoSuchMethodException err) {
throw new ConfigurationNotValidException(err);
} catch (IllegalAccessException | IllegalArgumentException
| InstantiationException | NoSuchMethodException
| SecurityException | InvocationTargetException err) {
} catch (final IllegalAccessException | IllegalArgumentException | InstantiationException | SecurityException | InvocationTargetException err) {
throw new RuntimeException(err);
}
}
Expand Down Expand Up @@ -654,7 +650,7 @@ public RuntimeServerConfiguration buildValidConfiguration(ConfigurationStore sim

// Try to perform a service configuration from the passed store.
newConfiguration.configure(simpleStore);
buildMapper(newConfiguration.getMapperClassname(), simpleStore);
buildMapper(newConfiguration.getMapperClassname(), this, simpleStore);
buildRealm(userRealmClassname, simpleStore);

return newConfiguration;
Expand Down Expand Up @@ -747,8 +743,7 @@ private void applyDynamicConfiguration(ConfigurationStore newConfigurationStore,
}
try {
RuntimeServerConfiguration newConfiguration = buildValidConfiguration(storeWithConfig);
EndpointMapper newMapper = buildMapper(newConfiguration.getMapperClassname(), storeWithConfig);
newMapper.setParent(this);
EndpointMapper newMapper = buildMapper(newConfiguration.getMapperClassname(), this, storeWithConfig);
UserRealm newRealm = buildRealm(userRealmClassname, storeWithConfig);

this.filters = buildFilters(newConfiguration);
Expand Down Expand Up @@ -876,7 +871,8 @@ public Map<EndpointKey, Map<String, ConnectionPoolStats>> getConnectionPoolsStat
if (!metric.getName().startsWith(CONNECTION_PROVIDER_PREFIX)) {
return;
}
EndpointKey key = EndpointKey.make(metric.getTag(REMOTE_ADDRESS));
final String remoteAddress = Objects.requireNonNull(metric.getTag(REMOTE_ADDRESS));
EndpointKey key = EndpointKey.make(remoteAddress);
Map<String, ConnectionPoolStats> pools = res.computeIfAbsent(key, k -> new HashMap<>());
String poolName = metric.getTag(NAME);
ConnectionPoolStats stats = pools.computeIfAbsent(poolName, k -> new ConnectionPoolStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.carapaceproxy.configstore.PropertiesConfigurationStore;
import org.carapaceproxy.core.HttpProxyServer;
import org.carapaceproxy.server.config.ConfigurationNotValidException;
import org.carapaceproxy.server.mapper.StandardEndpointMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,8 +175,7 @@ public void join() {
*/
public void start() throws Exception {
pidFileLocker.lock();

server = new HttpProxyServer(null, basePath);
server = new HttpProxyServer(StandardEndpointMapper::new, basePath);
server.configureAtBoot(configuration);
server.start();
server.startMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public BackendHealthManager(final RuntimeServerConfiguration conf, final Endpoin
this.period = DEFAULT_PERIOD;
}

public boolean isTolerant() {
return tolerant;
}

public int getPeriod() {
return period;
}
Expand Down Expand Up @@ -217,7 +221,7 @@ public boolean exceedsCapacity(final String backendId) {
return false;
}
final BackendHealthStatus backendStatus = getBackendStatus(backendConfiguration.hostPort());
return backendConfiguration.safeCapacity() > backendStatus.getConnections() && !this.tolerant;
return backendConfiguration.safeCapacity() > backendStatus.getConnections();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.carapaceproxy.server.config;

import java.util.List;
import java.util.function.Function;
import org.carapaceproxy.server.mapper.EndpointMapper;

/**
Expand All @@ -32,13 +31,10 @@ public interface BackendSelector {

/**
* Functional interface that models a factory for selectors given the mapper they should be applied to.
*
* @see SafeBackendSelector#forMapper(EndpointMapper)
*/
@FunctionalInterface
interface SelectorFactory extends Function<EndpointMapper, BackendSelector> {
interface SelectorFactory {

@Override
BackendSelector apply(EndpointMapper endpointMapper);
BackendSelector build(EndpointMapper endpointMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,29 @@
import java.util.List;
import java.util.Map;
import java.util.SequencedCollection;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.carapaceproxy.server.backends.BackendHealthManager;
import org.carapaceproxy.server.backends.BackendHealthStatus;
import org.carapaceproxy.server.mapper.EndpointMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SafeBackendSelector implements BackendSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(SafeBackendSelector.class);
private final SequencedCollection<String> backends;
private final Map<String, DirectorConfiguration> directors;
private final BackendHealthManager healthManager;
private final EndpointMapper mapper;

public static BackendSelector forMapper(final EndpointMapper mapper) {
final var directors = mapper.getDirectors()
.stream()
.collect(Collectors.toUnmodifiableMap(DirectorConfiguration::getId, Function.identity()));
return new SafeBackendSelector(mapper.getBackends().sequencedKeySet(), directors, mapper.getBackendHealthManager());
}

public SafeBackendSelector(final SequencedCollection<String> allBackendIds, final Map<String, DirectorConfiguration> directors, final BackendHealthManager healthManager) {
this.backends = allBackendIds;
this.directors = directors;
this.healthManager = healthManager;
public SafeBackendSelector(final EndpointMapper mapper) {
this.mapper = mapper;
}

@Override
public List<String> selectBackends(final String userId, final String sessionId, final String director) {
final Map<String, DirectorConfiguration> directors = mapper.getDirectors();
if (!directors.containsKey(director)) {
LOGGER.error("Director \"{}\" not configured, while handling request userId={} sessionId={}", director, userId, sessionId);
return List.of();
}
final DirectorConfiguration directorConfig = directors.get(director);
if (directorConfig.getBackends().contains(ALL_BACKENDS)) {
return sortByConnections(backends);
return sortByConnections(mapper.getBackends().sequencedKeySet());
}
return sortByConnections(directorConfig.getBackends());
}
Expand All @@ -50,10 +37,10 @@ public List<String> sortByConnections(final SequencedCollection<String> backendI
}

private long connections(final String backendId) {
final BackendHealthStatus backendStatus = healthManager.getBackendStatus(backendId);
final BackendHealthStatus backendStatus = mapper.getBackendHealthManager().getBackendStatus(backendId);
return switch (backendStatus.getStatus()) {
case DOWN -> Long.MAX_VALUE; // backends that are down are put last, but not dropped
case COLD -> healthManager.exceedsCapacity(backendId)
case COLD -> mapper.getBackendHealthManager().exceedsCapacity(backendId)
? Long.MAX_VALUE - 1 // cold backends that exceed safe capacity are put last, just before down ones
: backendStatus.getConnections();
case STABLE -> backendStatus.getConnections();
Expand Down
Loading

0 comments on commit 4016086

Please sign in to comment.