Skip to content

Commit

Permalink
feat: introduce SafeBackendSelector
Browse files Browse the repository at this point in the history
  • Loading branch information
NiccoMlt committed Nov 29, 2024
1 parent 48b5bf3 commit 7e348bb
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ public BackendHealthStatus getBackendStatus(final String backendId) {
public boolean exceedsCapacity(final String backendId) {
final BackendConfiguration backendConfiguration = this.mapper.getBackends().get(backendId);
Objects.requireNonNull(backendConfiguration);
if (backendConfiguration.coldCapacity() <= 0) {
if (backendConfiguration.safeCapacity() <= 0) {
return false;
}
final BackendHealthStatus backendStatus = getBackendStatus(backendConfiguration.hostPort());
return backendConfiguration.coldCapacity() > backendStatus.getConnections() && !this.tolerant;
return backendConfiguration.safeCapacity() > backendStatus.getConnections() && !this.tolerant;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,29 @@
package org.carapaceproxy.server.config;

import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.server.backends.BackendHealthStatus.Status;

/**
* Configuration of a single backend server
* Configuration of a single backend server.
*
* @param id an arbitrary ID of the backend
* @param hostPort the host:port tuple for the backend
* @param probePath a path to use to probe the backend for reachability
* @param safeCapacity a capacity that is considered safe even when {@link Status#COLD cold}
*/
public record BackendConfiguration(String id, EndpointKey hostPort, String probePath, int coldCapacity) {

public BackendConfiguration(final String id, final String host, final int port, final String probePath, final int coldCapacity) {
this(id, new EndpointKey(host, port), probePath, coldCapacity);
public record BackendConfiguration(String id, EndpointKey hostPort, String probePath, int safeCapacity) {

/**
* Configuration of a single backend server.
*
* @param id an arbitrary ID of the backend
* @param host the host name
* @param port the port to use
* @param probePath a path to use to probe the backend for reachability
* @param safeCapacity a capacity that is considered safe even when {@link Status#COLD cold}, or 0 for an infinite capacity
*/
public BackendConfiguration(final String id, final String host, final int port, final String probePath, final int safeCapacity) {
this(id, new EndpointKey(host, port), probePath, safeCapacity);
}

public String host() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,25 @@
package org.carapaceproxy.server.config;

import java.util.List;
import java.util.function.Function;
import org.carapaceproxy.server.mapper.EndpointMapper;

/**
* Chooses the backend which can serve the request
*/
public interface BackendSelector {

List<String> selectBackends(String userId, String sessionId, String director);

/**
* Functional interface that models a factory for selectors given the mapper they should be applied to.
*
* @see SafeBackendSelector#forMapper(EndpointMapper)
*/
@FunctionalInterface
interface SelectorFactory extends Function<EndpointMapper, BackendSelector> {

@Override
BackendSelector apply(EndpointMapper endpointMapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.carapaceproxy.server.config;

import static org.carapaceproxy.server.config.DirectorConfiguration.ALL_BACKENDS;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SequencedCollection;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.carapaceproxy.server.backends.BackendHealthManager;
import org.carapaceproxy.server.backends.BackendHealthStatus;
import org.carapaceproxy.server.mapper.EndpointMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SafeBackendSelector implements BackendSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(SafeBackendSelector.class);
private final SequencedCollection<String> backends;
private final Map<String, DirectorConfiguration> directors;
private final BackendHealthManager healthManager;

public static BackendSelector forMapper(final EndpointMapper mapper) {
final var directors = mapper.getDirectors()
.stream()
.collect(Collectors.toUnmodifiableMap(DirectorConfiguration::getId, Function.identity()));
return new SafeBackendSelector(mapper.getBackends().sequencedKeySet(), directors, mapper.getBackendHealthManager());
}

public SafeBackendSelector(final SequencedCollection<String> allBackendIds, final Map<String, DirectorConfiguration> directors, final BackendHealthManager healthManager) {
this.backends = allBackendIds;
this.directors = directors;
this.healthManager = healthManager;
}

@Override
public List<String> selectBackends(final String userId, final String sessionId, final String director) {
if (!directors.containsKey(director)) {
LOGGER.error("Director \"{}\" not configured, while handling request userId={} sessionId={}", director, userId, sessionId);
return List.of();
}
final DirectorConfiguration directorConfig = directors.get(director);
if (directorConfig.getBackends().contains(ALL_BACKENDS)) {
return sortByConnections(backends);
}
return sortByConnections(directorConfig.getBackends());
}

public List<String> sortByConnections(final SequencedCollection<String> backendIds) {
return backendIds.stream().sorted(Comparator.comparingLong(this::connections)).toList();
}

private long connections(final String backendId) {
final BackendHealthStatus backendStatus = healthManager.getBackendStatus(backendId);
return switch (backendStatus.getStatus()) {
case DOWN -> Long.MAX_VALUE; // backends that are down are put last, but not dropped
case COLD -> healthManager.exceedsCapacity(backendId)
? Long.MAX_VALUE - 1 // cold backends that exceed safe capacity are put last, just before down ones
: backendStatus.getConnections();
case STABLE -> backendStatus.getConnections();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected final DynamicCertificatesManager getDynamicCertificatesManager() {
return parent.getDynamicCertificatesManager();
}

protected final BackendHealthManager getBackendHealthManager() {
public final BackendHealthManager getBackendHealthManager() {
Objects.requireNonNull(parent);
return parent.getBackendHealthManager();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SequencedCollection;
import java.util.function.Function;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;
import org.carapaceproxy.server.config.BackendSelector;
import org.carapaceproxy.server.config.DirectorConfiguration;
import org.slf4j.Logger;
Expand All @@ -19,14 +22,21 @@
* @see Collections#shuffle(List, RandomGenerator)
* @see SecureRandom
*/
class RandomBackendSelector implements BackendSelector {
public class RandomBackendSelector implements BackendSelector {
private static final Logger LOG = LoggerFactory.getLogger(RandomBackendSelector.class);
private static final RandomGenerator RANDOM = new SecureRandom();

private final List<String> allBackendIds;
private final SequencedCollection<String> allBackendIds;
private final Map<String, DirectorConfiguration> directors;

public RandomBackendSelector(final List<String> allBackendIds, final Map<String, DirectorConfiguration> directors) {
public static BackendSelector forMapper(final EndpointMapper mapper) {
final var directors = mapper.getDirectors()
.stream()
.collect(Collectors.toUnmodifiableMap(DirectorConfiguration::getId, Function.identity()));
return new RandomBackendSelector(mapper.getBackends().sequencedKeySet(), directors);
}

private RandomBackendSelector(final SequencedCollection<String> allBackendIds, final Map<String, DirectorConfiguration> directors) {
this.allBackendIds = allBackendIds;
this.directors = directors;
}
Expand All @@ -44,7 +54,7 @@ public List<String> selectBackends(final String userId, final String sessionId,
return shuffleCopy(directorConfig.getBackends());
}

public List<String> shuffleCopy(final List<String> ids) {
public List<String> shuffleCopy(final SequencedCollection<String> ids) {
if (ids.isEmpty()) {
return List.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,8 @@ public class StandardEndpointMapper extends EndpointMapper {
private String debuggingHeaderName = DEBUGGING_HEADER_DEFAULT_NAME;
private boolean debuggingHeaderEnabled = false;

public StandardEndpointMapper(final BackendSelector backendSelector) {
this.backendSelector = backendSelector;
}

public StandardEndpointMapper() {
this.backendSelector = new RandomBackendSelector(allBackendIds, directors);
public StandardEndpointMapper(final BackendSelector.SelectorFactory backendSelector) {
this.backendSelector = backendSelector.apply(this);
}

@Override
Expand Down Expand Up @@ -219,10 +215,15 @@ public MapResult map(final ProxyRequest request) {
case DOWN:
continue;
case COLD:
final int capacity = backend.coldCapacity();
if (capacity > 0 && backendStatus.getConnections() > capacity) {
// can't use this
continue;
final int capacity = backend.safeCapacity();
if (backendHealthManager.exceedsCapacity(backendId)) {
/*
* backends are returned by the mapper sorted
* from the most desirable to the less desirable;
* if the execution reaches this point,
* we should use a cold backend even if over the recommended capacity anyway...
*/
LOG.warn("Backend {} exceeds cold capacity of {}, but will use it anyway", backendId, capacity);
}
// falls through
case STABLE: {
Expand Down Expand Up @@ -467,10 +468,10 @@ public void configure(ConfigurationStore properties) throws ConfigurationNotVali
final String host = properties.getString(prefix + "host", "localhost");
final int port = properties.getInt(prefix + "port", 8086);
final String probePath = properties.getString(prefix + "probePath", "");
final int coldCapacity = properties.getInt(prefix + "coldCapacity", DEFAULT_CAPACITY);
LOG.info("configured backend {} {}:{} enabled={} capacity={}", id, host, port, enabled, coldCapacity);
final int safeCapacity = properties.getInt(prefix + "safeCapacity", DEFAULT_CAPACITY);
LOG.info("configured backend {} {}:{} enabled={} capacity={}", id, host, port, enabled, safeCapacity);
if (enabled) {
addBackend(new BackendConfiguration(id, host, port, probePath, coldCapacity));
addBackend(new BackendConfiguration(id, host, port, probePath, safeCapacity));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.Properties;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.configstore.PropertiesConfigurationStore;
import org.carapaceproxy.core.EndpointKey;
import org.carapaceproxy.core.HttpProxyServer;
import org.carapaceproxy.core.ProxyRequestsManager;
import org.carapaceproxy.server.backends.BackendHealthStatus;
Expand All @@ -43,6 +43,7 @@
import org.carapaceproxy.server.config.DirectorConfiguration;
import org.carapaceproxy.server.config.NetworkListenerConfiguration;
import org.carapaceproxy.server.config.RouteConfiguration;
import org.carapaceproxy.server.config.SafeBackendSelector;
import org.carapaceproxy.server.mapper.StandardEndpointMapper;
import org.carapaceproxy.server.mapper.requestmatcher.RegexpRequestMatcher;
import org.carapaceproxy.utils.RawHttpClient;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testBackendUnreachableOnStuckRequest(boolean backendsUnreachableOnSt
final int theport = wireMockRule.port();
EndpointKey key = new EndpointKey("localhost", theport);

StandardEndpointMapper mapper = new StandardEndpointMapper();
StandardEndpointMapper mapper = new StandardEndpointMapper(SafeBackendSelector::forMapper);
mapper.addBackend(new BackendConfiguration("backend-a", "localhost", theport, "/", -1));
mapper.addDirector(new DirectorConfiguration("director-1").addBackend("backend-a"));
mapper.addAction(new ActionConfiguration("proxy-1", ActionConfiguration.TYPE_PROXY, "director-1", null, -1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.carapaceproxy.server.config.BackendConfiguration;
import org.carapaceproxy.server.config.DirectorConfiguration;
import org.carapaceproxy.server.config.RouteConfiguration;
import org.carapaceproxy.server.config.SafeBackendSelector;
import org.carapaceproxy.server.mapper.requestmatcher.RegexpRequestMatcher;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void test() throws Exception {
.withBody("it <b>works</b> !!")));

int backendPort = backend.port();
StandardEndpointMapper mapper = new StandardEndpointMapper();
StandardEndpointMapper mapper = new StandardEndpointMapper(SafeBackendSelector::forMapper);

mapper.addBackend(new BackendConfiguration("backend-a", "localhost", backendPort, "/", -1));
mapper.addBackend(new BackendConfiguration("backend-b", "localhost", backendPort, "/", -1));
Expand Down Expand Up @@ -316,7 +317,7 @@ public void testDefaultRoute() throws Exception {

int backendPort = backend.port();

StandardEndpointMapper mapper = new StandardEndpointMapper();
StandardEndpointMapper mapper = new StandardEndpointMapper(SafeBackendSelector::forMapper);
mapper.addBackend(new BackendConfiguration("backend", "localhost", backendPort, "/", -1));
mapper.addBackend(new BackendConfiguration("backend-down", "localhost-down", backendPort, "/", -1));
mapper.addDirector(new DirectorConfiguration("director").addBackend("backend"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.carapaceproxy.server.config.BackendConfiguration;
import org.carapaceproxy.server.config.DirectorConfiguration;
import org.carapaceproxy.server.config.RouteConfiguration;
import org.carapaceproxy.server.config.SafeBackendSelector;
import org.carapaceproxy.server.mapper.requestmatcher.RegexpRequestMatcher;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void test() throws Exception {
.withBody("it <b>works</b> !!")));

int backendPort = backend1.port();
StandardEndpointMapper mapper = new StandardEndpointMapper();
StandardEndpointMapper mapper = new StandardEndpointMapper(SafeBackendSelector::forMapper);
Properties properties = new Properties();
properties.put("mapper.forcedirector.parameter", "thedirector");
properties.put("mapper.forcebackend.parameter", "thebackend");
Expand Down

0 comments on commit 7e348bb

Please sign in to comment.