Skip to content

Commit

Permalink
feat: implement the feature by spreading the backend status
Browse files Browse the repository at this point in the history
  • Loading branch information
NiccoMlt committed Nov 29, 2024
1 parent dc6f8bd commit 7c4768c
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Map<String, BackendBean> getAll() {
if (bhs != null) {
bean.available = bhs.getStatus() != BackendHealthStatus.Status.DOWN;
bean.reportedAsUnreachable = bhs.getStatus() == BackendHealthStatus.Status.DOWN;
bean.reportedAsUnreachableTs = bhs.getLastUnreachableTs();
bean.reportedAsUnreachableTs = bhs.getUnreachableSince();
BackendHealthCheck lastProbe = bhs.getLastProbe();
if (lastProbe != null) {
bean.lastProbeTs = lastProbe.endTs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public boolean isValidHostAndPort(final String hostAndPort) {
if (hostAndPort == null) {
return false;
}
HostAndPort parsed = HostAndPort.fromString(hostAndPort);
String host = parsed.getHost();
final HostAndPort parsed = HostAndPort.fromString(hostAndPort);
final String host = parsed.getHost();
if (parsed.hasPort()) {
return !host.isBlank()
&& (InternetDomainName.isValid(host) || InetAddresses.isInetAddress(host))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.http.HttpStatus;
import org.carapaceproxy.EndpointStats;
import org.carapaceproxy.SimpleHTTPResponse;
import org.carapaceproxy.server.backends.BackendHealthStatus;
import org.carapaceproxy.server.cache.ContentsCache;
import org.carapaceproxy.server.config.BackendConfiguration;
import org.carapaceproxy.server.config.ConnectionPoolConfiguration;
Expand Down Expand Up @@ -106,7 +107,7 @@ public ProxyRequestsManager(HttpProxyServer parent) {
this.parent = parent;
}

public EndpointStats getEndpointStats(EndpointKey key) {
public EndpointStats getEndpointStats(final EndpointKey key) {
return endpointsStats.get(key);
}

Expand Down Expand Up @@ -148,8 +149,8 @@ public Publisher<Void> processRequest(ProxyRequest request) {
case BAD_REQUEST -> serveBadRequestMessage(request);
case STATIC, ACME_CHALLENGE -> serveStaticMessage(request);
case REDIRECT -> serveRedirect(request);
case PROXY -> forward(request, false);
case CACHE -> serveFromCache(request); // cached content
case PROXY -> forward(request, false, action.getHealthStatus());
case CACHE -> serveFromCache(request, action.getHealthStatus()); // cached content
default -> throw new IllegalStateException("Action " + action.getAction() + " not supported");
};
} finally {
Expand Down Expand Up @@ -347,20 +348,22 @@ private static void addCustomResponseHeaders(final HttpHeaders responseHeaders,
*
* @param request the unpacked incoming request to forward to the corresponding backend endpoint
* @param cache whether the request is cacheable or not
* @param healthStatus the health status of the chosen backend; it should be notified when connection starts and ends
* @return a {@link Flux} forwarding the returned {@link Publisher} sequence
*/
public Publisher<Void> forward(ProxyRequest request, boolean cache) {
public Publisher<Void> forward(final ProxyRequest request, final boolean cache, final BackendHealthStatus healthStatus) {
Objects.requireNonNull(request.getAction());
final String endpointHost = request.getAction().getHost();
Objects.requireNonNull(endpointHost);
final int endpointPort = request.getAction().getPort();
EndpointKey key = EndpointKey.make(endpointHost, endpointPort);
EndpointStats endpointStats = endpointsStats.computeIfAbsent(key, EndpointStats::new);
final EndpointKey key = EndpointKey.make(endpointHost, endpointPort);
final EndpointStats endpointStats = endpointsStats.computeIfAbsent(key, EndpointStats::new);

var connectionToEndpoint = connectionsManager.apply(request);
ConnectionPoolConfiguration connectionConfig = connectionToEndpoint.getKey();
ConnectionProvider connectionProvider = connectionToEndpoint.getValue();
final var connectionToEndpoint = connectionsManager.apply(request);
final ConnectionPoolConfiguration connectionConfig = connectionToEndpoint.getKey();
final ConnectionProvider connectionProvider = connectionToEndpoint.getValue();
if (LOGGER.isDebugEnabled()) {
Map<String, HttpProxyServer.ConnectionPoolStats> stats = parent.getConnectionPoolsStats().get(key);
final Map<String, HttpProxyServer.ConnectionPoolStats> stats = parent.getConnectionPoolsStats().get(key);
if (stats != null) {
LOGGER.debug(
"Connection {} stats: {}",
Expand Down Expand Up @@ -416,6 +419,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
}
}).doAfterResponseSuccess((resp, conn) -> {
PENDING_REQUESTS_GAUGE.dec();
healthStatus.decrementConnections();
endpointStats.getLastActivity().set(System.currentTimeMillis());
}));

Expand All @@ -429,6 +433,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
}

PENDING_REQUESTS_GAUGE.inc();
healthStatus.incrementConnections();
return forwarder.request(request.getMethod())
.uri(request.getUri())
.send((req, out) -> {
Expand Down Expand Up @@ -489,6 +494,7 @@ public Publisher<Void> forward(ProxyRequest request, boolean cache) {
}));
}).onErrorResume(err -> { // custom endpoint request/response error handling
PENDING_REQUESTS_GAUGE.dec();
healthStatus.decrementConnections();

EndpointKey endpoint = EndpointKey.make(request.getAction().getHost(), request.getAction().getPort());
if (err instanceof ReadTimeoutException) {
Expand Down Expand Up @@ -557,11 +563,11 @@ private void addCachedResponseHeaders(ProxyRequest request) {
}
}

private Publisher<Void> serveFromCache(ProxyRequest request) {
private Publisher<Void> serveFromCache(ProxyRequest request, final BackendHealthStatus healthStatus) {
ContentsCache.ContentSender cacheSender = parent.getCache().getCacheSender(request);
if (cacheSender == null) {
// content non cached, forwarding and caching...
return forward(request, true);
return forward(request, true, healthStatus);
}
request.setServedFromCache(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void run() {
}
for (final BackendConfiguration backend : mapper.getBackends().values()) {
final EndpointKey endpoint = backend.hostPort();
final BackendHealthStatus status = findStatus(endpoint);
final BackendHealthStatus status = getBackendStatus(endpoint);
final BackendHealthCheck checkResult = BackendHealthCheck.check(backend, connectTimeout);

if (checkResult.ok()) {
Expand Down Expand Up @@ -173,23 +173,19 @@ private void cleanup() {
}

public void reportBackendReachable(final EndpointKey hostPort, final long timestamp) {
findStatus(hostPort).reportAsReachable(timestamp);
getBackendStatus(hostPort).reportAsReachable(timestamp);
}

public void reportBackendUnreachable(final EndpointKey hostPort, final long timestamp, final String cause) {
findStatus(hostPort).reportAsUnreachable(timestamp, cause);
}

private BackendHealthStatus findStatus(final EndpointKey hostPort) {
return backends.computeIfAbsent(hostPort, BackendHealthStatus::new);
getBackendStatus(hostPort).reportAsUnreachable(timestamp, cause);
}

public Map<EndpointKey, BackendHealthStatus> getBackendsSnapshot() {
return Map.copyOf(backends);
}

public BackendHealthStatus.Status getBackendStatus(final EndpointKey hostPort) {
return findStatus(hostPort).getStatus();
public BackendHealthStatus getBackendStatus(final EndpointKey hostPort) {
return backends.computeIfAbsent(hostPort, BackendHealthStatus::new);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.sql.Timestamp;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.carapaceproxy.core.EndpointKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,22 +33,46 @@
*/
public class BackendHealthStatus {

// todo replace this with a property of some kind
// todo replace this with a configurable property of some kind
public static final long WARMUP_MILLIS = Duration.ofMinutes(1).toMillis();

private static final Logger LOG = LoggerFactory.getLogger(BackendHealthStatus.class);

private final EndpointKey hostPort;
private final AtomicInteger connections;

private volatile Status status;
private volatile long lastUnreachableTs;
private volatile long lastReachableTs;
private volatile long unreachableSince;
private volatile long lastUnreachable;
private volatile long lastReachable;
private volatile BackendHealthCheck lastProbe;

public BackendHealthStatus(final EndpointKey hostPort) {
this.hostPort = hostPort;
// todo using DOWN would break BasicStandardEndpointMapperTest, UnreachableBackendTest, and StuckRequestsTest
// todo cannot start with a DOWN backend (+ current time) as it would break:
// - BasicStandardEndpointMapperTest,
// - UnreachableBackendTest,
// - StuckRequestsTest,
// - HealthCheckTest
// we assume that the backend is reachable when status is created
this.status = Status.COLD;
this.lastUnreachableTs = System.currentTimeMillis();
this.unreachableSince = 0L;
final long created = System.currentTimeMillis();
this.lastUnreachable = created;
this.lastReachable = created;
this.connections = new AtomicInteger();
}

public long getUnreachableSince() {
return unreachableSince;
}

public long getLastUnreachable() {
return lastUnreachable;
}

public long getLastReachable() {
return lastReachable;
}

public EndpointKey getHostPort() {
Expand All @@ -66,36 +91,63 @@ public Status getStatus() {
return status;
}

public long getLastUnreachableTs() {
return lastUnreachableTs;
}

public void reportAsUnreachable(final long timestamp, final String cause) {
LOG.info("{}: reportAsUnreachable {}, cause {}", hostPort, new Timestamp(timestamp), cause);
this.lastUnreachableTs = timestamp;
this.status = Status.DOWN;
if (this.status != Status.DOWN) {
this.status = Status.DOWN;
this.unreachableSince = timestamp;
}
this.lastUnreachable = timestamp;
this.connections.set(0);
}

public void reportAsReachable(final long timestamp) {
this.lastReachableTs = timestamp;
if (this.lastReachableTs - this.lastUnreachableTs >= WARMUP_MILLIS) {
this.status = Status.STABLE;
} else {
this.status = Status.COLD;
LOG.info("{}: reportAsUnreachable {}", hostPort, new Timestamp(timestamp));
switch (this.status) {
case DOWN:
this.status = Status.COLD;
this.unreachableSince = 0;
this.lastReachable = timestamp;
break;
case COLD:
this.lastReachable = timestamp;
if (this.lastReachable - this.lastUnreachable > WARMUP_MILLIS) {
this.status = Status.STABLE;
}
break;
case STABLE:
this.lastReachable = timestamp;
break;
}
}

@Override
public String toString() {
return "BackendHealthStatus{"
+ " hostPort=" + this.hostPort
+ ", connections=" + this.connections
+ ", status=" + this.status
+ ", lastUnreachableTs=" + this.lastUnreachableTs
+ ", lastReachableTs=" + this.lastReachableTs
+ ", unreachableSince=" + this.unreachableSince
+ ", unreachableUntil=" + this.lastUnreachable
+ ", lastReachable=" + this.lastReachable
+ ", lastProbe=" + this.lastProbe
+ '}';
}

public int getConnections() {
return this.connections.get();
}

public void incrementConnections() {
this.connections.incrementAndGet();
}

public void decrementConnections() {
if (this.connections.getAcquire() > 0) {
this.connections.decrementAndGet();
}
}

/**
* The enum models a simple status of the backend.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,25 @@
@Data
public class ActionConfiguration {

/**
* @see org.carapaceproxy.server.mapper.MapResult.Action#PROXY
*/
public static final String TYPE_PROXY = "proxy";
/**
* @see org.carapaceproxy.server.mapper.MapResult.Action#CACHE
*/
public static final String TYPE_CACHE = "cache";
/**
* @see org.carapaceproxy.server.mapper.MapResult.Action#STATIC
*/
public static final String TYPE_STATIC = "static";
/**
* @see org.carapaceproxy.server.mapper.MapResult.Action#ACME_CHALLENGE
*/
public static final String TYPE_ACME_CHALLENGE = "acme-challenge";
/**
* @see org.carapaceproxy.server.mapper.MapResult.Action#REDIRECT
*/
public static final String TYPE_REDIRECT = "redirect";

private final String id;
Expand All @@ -61,7 +76,7 @@ public ActionConfiguration(String id, String type, String director, String file,
}

public ActionConfiguration setCustomHeaders(List<CustomHeader> customHeaders) {
this.customHeaders = Collections.unmodifiableList(customHeaders == null ? new ArrayList() : customHeaders);
this.customHeaders = Collections.unmodifiableList(customHeaders == null ? new ArrayList<>() : customHeaders);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.carapaceproxy.server.mapper;

import java.util.List;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Setter;
import lombok.ToString;
import org.carapaceproxy.server.backends.BackendHealthStatus;

@Data
@Builder
Expand Down Expand Up @@ -78,7 +83,6 @@ public enum Action {
public static final String REDIRECT_PROTO_HTTPS = "https";
public static final String REDIRECT_PROTO_HTTP = "http";

// todo we don't actually want to have these nullable: probably we should have different classes for each case
private String host;
private int port;
private Action action;
Expand All @@ -90,6 +94,11 @@ public enum Action {
private String redirectProto;
private String redirectPath;

@Setter(AccessLevel.NONE)
@EqualsAndHashCode.Exclude
@ToString.Exclude
private BackendHealthStatus healthStatus;

public static MapResult notFound(String routeId) {
return MapResult.builder()
.action(Action.NOTFOUND)
Expand Down
Loading

0 comments on commit 7c4768c

Please sign in to comment.