Skip to content

Commit

Permalink
FISH-659 Merge pull request payara#5066 from pdudits/fish-659
Browse files Browse the repository at this point in the history
FISH-659: Fault Tolerance 3.0 full pass
  • Loading branch information
MattGill98 authored and Pandrex247 committed Aug 5, 2021
1 parent 935470e commit 839d845
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,51 +55,59 @@
@Configured(name = "microprofile-fault-tolerance-configuration")
public interface FaultToleranceServiceConfiguration extends ConfigExtension {

@Attribute(defaultValue = "concurrent/__defaultManagedExecutorService", dataType = String.class)
@Attribute(defaultValue = "java:comp/DefaultManagedExecutorService", dataType = String.class)
public String getManagedExecutorService();
public void setManagedExecutorService(String managedExecutorServiceName);

@Attribute(defaultValue = "concurrent/__defaultManagedScheduledExecutorService", dataType = String.class)
@Attribute(defaultValue = "java:comp/DefaultManagedScheduledExecutorService", dataType = String.class)
public String getManagedScheduledExecutorService();
public void setManagedScheduledExecutorService(String managedScheduledExecutorServiceName);

/**
* @deprecated Managed thread pools are used since 5.2020.8 (again)
* @return The maximum number of threads used to run asynchronous methods concurrently. This is the upper limit. The
* executor will vary the actual pool size depending on demand up to this upper limit. If no demand exist
* the actual pool size is zero.
*/
@Deprecated
@Attribute(defaultValue = "2000", dataType = Integer.class)
@Min(value = 20)
String getAsyncMaxPoolSize();
void setAsyncMaxPoolSize(String asyncMaxPoolSize);

/**
* @deprecated Managed thread pools are used since 5.2020.8 (again)
* @return The maximum number of threads used to schedule delayed execution and detect timeouts processing FT
* semantics. This should be understood as upper limit. The implementation might choose to keep up to this
* number of threads alive or vary the actual pool size according to demands.
*/
@Deprecated
@Attribute(defaultValue = "20", dataType = Integer.class)
@Min(value = 1)
String getDelayMaxPoolSize();
void setDelayMaxPoolSize(String delayMaxPoolSize);

/**
* @deprecated Managed thread pools are used since 5.2020.8 (again)
* @return The number of seconds an idle worker in the async pool has to be out of work before it is disposed and
* the pool scales down in size. Changes to this setting are dynamically applied and do not need a restart.
*/
@Deprecated
@Attribute(defaultValue = "60", dataType = Integer.class)
@Min(value = 20)
@Max(value = 60 * 60)
String getAsyncPoolKeepAliveInSeconds();
void setAsyncPoolKeepAliveInSeconds(String asyncPoolKeepAliveInSeconds);

/**
* @deprecated cleaning of FT state has been removed in 5.2020.8
* @return The interval duration in minutes for the background job that cleans expired FT state. Changes do need a
* restart of the server.
*/
@Attribute(defaultValue = "1", dataType = Integer.class)
@Min(value = 1)
@Max(value = 60 * 24)
@Deprecated
String getCleanupIntervalInMinutes();
void setCleanupIntervalInMinutes(String cleanupIntervalInMinutes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,13 @@ public Object intercept(InvocationContext context) throws Exception {
}
context.getContextData().put(PAYARA_FAULT_TOLERANCE_INTERCEPTOR_EXECUTED, Boolean.TRUE);
try {
FaultToleranceService env =
Globals.getDefaultBaseServiceLocator().getService(FaultToleranceService.class);
initialize();
AtomicReference<FaultToleranceConfig> lazyConfig = new AtomicReference<>();
Supplier<FaultToleranceConfig> configSupplier = () -> //
lazyConfig.updateAndGet(value -> value != null ? value : env.getConfig(context, this));
lazyConfig.updateAndGet(value -> value != null ? value : faultToleranceService.getConfig(context, this));
FaultTolerancePolicy policy = FaultTolerancePolicy.get(context, configSupplier);
if (policy.isPresent) {
return policy.proceed(context, () -> env.getMethodContext(context, policy, getRequestContextController()));
return policy.proceed(context, () -> faultToleranceService.getMethodContext(context, policy, getRequestContextController()));
}
} catch (FaultToleranceDefinitionException e) {
logger.log(Level.SEVERE, "Effective FT policy contains illegal values, fault tolerance cannot be applied,"
Expand All @@ -100,6 +99,13 @@ public Object intercept(InvocationContext context) throws Exception {
return context.proceed();
}

private void initialize() {
if (this.faultToleranceService != null) {
return;
}
this.faultToleranceService = Globals.getDefaultBaseServiceLocator().getService(FaultToleranceService.class);
}

private RequestContextController getRequestContextController() {
return requestContextControllerInstance.isResolvable() ? requestContextControllerInstance.get() : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ public static void clean() {
map -> map.entrySet().removeIf(entry -> now > entry.getValue().expiresMillis));
}


/**
* Removes all expired policies from the cache and all policies related to this classloader
*/
public static void clean(ClassLoader appClassLoader) {
long now = System.currentTimeMillis();
POLICY_BY_METHOD.entrySet().removeIf(entry -> entry.getKey().getClassLoader().equals(appClassLoader));
clean();
}

public static FaultTolerancePolicy asAnnotated(Class<?> target, Method annotated) {
return create(new StaticAnalysisContext(target, annotated),
FaultToleranceConfig.asAnnotated(target, annotated));
Expand All @@ -128,6 +138,7 @@ public static FaultTolerancePolicy get(InvocationContext context, Supplier<Fault
policy != null && !policy.isExpired() ? policy : create(context, configSpplier.get()));
}


private static FaultTolerancePolicy create(InvocationContext context, FaultToleranceConfig config) {
return new FaultTolerancePolicy(
config.isNonFallbackEnabled(),
Expand Down Expand Up @@ -233,6 +244,12 @@ void trace(String method) {
void endTrace() {
context.endTrace();
}

@Override
public String toString() {
return "FaultToleranceInvocation[context=" + context.toString() + ", isDone=" +
(asyncResult == null ? "(sync)" : asyncResult.isDone()) + "]";
}
}

/**
Expand Down Expand Up @@ -404,12 +421,12 @@ private AsyncFuture processRetryAsync(FaultToleranceInvocation invocation) throw
return asyncAttempt;
} catch (ExecutionException ex) { // this ExecutionException is from calling get() above in case completed exceptionally
if (!asyncAttempt.isExceptionThrown() && asynchronous.isSuccessWhenCompletedExceptionally()) {
invocation.timeoutIfConcludedConcurrently();
invocation.timeoutIfConcludedConcurrently();
return asyncAttempt;
}
rethrow(ex.getCause());
return null; // not reachable
}
}
}

private static void rethrow(Throwable t) throws Exception {
Expand Down Expand Up @@ -551,18 +568,18 @@ private Object processBulkheadStage(FaultToleranceInvocation invocation) throws
if (!isBulkheadPresent()) {
return proceed(invocation);
}
logger.log(Level.FINER, "Proceeding invocation with bulkhead semantics");
logger.log(Level.FINER, () -> "Proceeding invocation with bulkhead semantics in "+invocation);
final boolean isAsync = isAsynchronous();
final boolean exitIsOnCompletion = isAsync && bulkhead.exitIsOnCompletion;
boolean directExit = false; // Whether or not we semantically leave the bulkhead when leaving this method
final int runCapacity = bulkhead.value;
final int queueCapacity = isAsync ? bulkhead.waitingTaskQueue : 0;
AtomicInteger queuingOrRunning = invocation.context.getQueuingOrRunningPopulation();
final int maxAttemps = 5;
for (int i = 0; i < maxAttemps; i++) {
while (true) {
final int currentlyIn = queuingOrRunning.get();
if (currentlyIn >= runCapacity + queueCapacity) {
invocation.metrics.incrementBulkheadCallsRejectedTotal();
logger.log(Level.FINER, "No free work or queue space.");
throw new BulkheadException("No free work or queue space.");
}
logger.log(Level.FINER, "Attempting to enter bulkhead.");
Expand Down Expand Up @@ -592,11 +609,13 @@ private Object processBulkheadStage(FaultToleranceInvocation invocation) throws
// ok, lets run
Object res = proceed(invocation);
if (!exitIsOnCompletion) {
logger.log(Level.FINER, () -> "Exiting synchronously with "+res);
return res;
}
directExit = false; // if we make if here exit is going to happen on completion
CompletionStage<?> asyncResult = ((CompletionStage<?>) res);
asyncResult.whenComplete((value, exception) -> {
logger.log(Level.FINER, () -> "Bulkhead invocation "+invocation+ " finished " + (exception != null ? "with exception "+exception.getMessage() : "sucessfully"));
invocation.metrics.addBulkheadExecutionDuration(Math.max(1, System.nanoTime() - executionSince));
// successful or not, we are out...
running.remove(currentThread);
Expand All @@ -619,8 +638,6 @@ private Object processBulkheadStage(FaultToleranceInvocation invocation) throws
}
}
}
invocation.metrics.incrementBulkheadCallsRejectedTotal();
throw new BulkheadException("No free work or queue space.");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import static java.lang.System.currentTimeMillis;

import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -71,7 +70,6 @@
import fish.payara.microprofile.faulttolerance.policy.AsynchronousPolicy;
import fish.payara.microprofile.faulttolerance.policy.FaultTolerancePolicy;
import fish.payara.microprofile.faulttolerance.state.CircuitBreakerState;
import fish.payara.microprofile.faulttolerance.state.CircuitBreakerState.CircuitState;
import fish.payara.notification.requesttracing.RequestTraceSpan;

/**
Expand All @@ -92,44 +90,28 @@ public final class FaultToleranceMethodContextImpl implements FaultToleranceMeth

private static final Logger logger = Logger.getLogger(FaultToleranceMethodContextImpl.class.getName());


static final class FaultToleranceMethodState {

final RequestContextController requestContext;
final FaultToleranceRequestTracing requestTracing;
final FaultToleranceMetrics metrics;
final ExecutorService asyncExecution;
final ScheduledExecutorService delayedExecution;
final WeakReference<Object> target;
final AtomicReference<CircuitBreakerState> circuitBreakerState = new AtomicReference<>();
final AtomicReference<BlockingQueue<Thread>> concurrentExecutions = new AtomicReference<>();
final AtomicInteger queuingOrRunningPopulation = new AtomicInteger();
final AtomicInteger executingThreadCount = new AtomicInteger();
final AtomicLong lastUsed = new AtomicLong(currentTimeMillis());

FaultToleranceMethodState(RequestContextController requestContext, FaultToleranceRequestTracing requestTracing,
FaultToleranceMetrics metrics, ExecutorService asyncExecution,
ScheduledExecutorService delayedExecution, WeakReference<Object> target) {
FaultToleranceMetrics metrics, ExecutorService asyncExecution,
ScheduledExecutorService delayedExecution) {
this.requestContext = requestContext;
this.requestTracing = requestTracing;
this.metrics = metrics;
this.asyncExecution = asyncExecution;
this.delayedExecution = delayedExecution;
this.target = target;
}

public boolean isExpired(long ttl) {
if (target.get() == null) {
return true; // target got GC'd - this is not useful any longer
}
return executingThreadCount.get() == 0 //
&& queuingOrRunningPopulation.get() == 0 //
&& lastUsed.get() + ttl < currentTimeMillis() //
&& isStabilyClosedCuicuit();
}

private boolean isStabilyClosedCuicuit() {
CircuitBreakerState state = circuitBreakerState.get();
return state == null || state.getCircuitState() == CircuitState.CLOSED && state.isClosedOutcomeSuccessOnly();
}
}

Expand All @@ -140,28 +122,30 @@ private boolean isStabilyClosedCuicuit() {
private final FaultToleranceMethodState shared;
private final InvocationContext context;
private final FaultTolerancePolicy policy;
private final String appName;

public String getAppName() {
return appName;
}

public FaultToleranceMethodContextImpl(RequestContextController requestContext, FaultToleranceRequestTracing requestTracing, FaultToleranceMetrics metrics,
ExecutorService asyncExecution, ScheduledExecutorService delayedExecution, Object target) {
this(new FaultToleranceMethodState(requestContext, requestTracing, metrics, asyncExecution, delayedExecution,
new WeakReference<>(target)), null, null);
ExecutorService asyncExecution, ScheduledExecutorService delayedExecution, String appName) {
this(new FaultToleranceMethodState(requestContext, requestTracing, metrics, asyncExecution, delayedExecution
), appName, null, null);
}

private FaultToleranceMethodContextImpl(FaultToleranceMethodState shared, InvocationContext context,
private FaultToleranceMethodContextImpl(FaultToleranceMethodState shared, String appName, InvocationContext context,
FaultTolerancePolicy policy) {
this.shared = shared;
this.context = context;
this.policy = policy;
this.appName = appName;
shared.lastUsed.accumulateAndGet(currentTimeMillis(), Long::max);
}

public boolean isExpired(long ttl) {
return shared.isExpired(ttl);
}

@Override
public FaultToleranceMethodContext boundTo(InvocationContext context, FaultTolerancePolicy policy) {
return new FaultToleranceMethodContextImpl(shared, context, policy);
return new FaultToleranceMethodContextImpl(shared, appName, context, policy);
}

@Override
Expand Down Expand Up @@ -243,7 +227,7 @@ public void runAsynchronous(AsyncFuture asyncResult, Callable<Object> task)
Future<?> futureResult = AsynchronousPolicy.toFuture(res);
if (!asyncResult.isCancelled()) { // could be cancelled in the meanwhile
if (!asyncResult.isDone()) {
asyncResult.complete(futureResult.get());
asyncResult.complete(futureResult.get());
}
} else {
futureResult.cancel(true);
Expand Down Expand Up @@ -297,4 +281,8 @@ public void endTrace() {
shared.requestTracing.endSpan();
}

@Override
public String toString() {
return super.toString()+"[method="+context.getMethod()+", target="+ context.getTarget()+", sharedState=" + shared + "]";
}
}
Loading

0 comments on commit 839d845

Please sign in to comment.