Skip to content

Commit

Permalink
Add exceptionHandler and backoff on AsyncLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
injae-kim committed Apr 17, 2024
1 parent 39dc307 commit e161fcc
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import com.linecorp.armeria.client.retry.Backoff;
import com.linecorp.armeria.common.annotation.Nullable;

/**
Expand All @@ -35,6 +37,10 @@ public final class AsyncLoaderBuilder<T> {
private Duration expireAfterLoad;
@Nullable
private Predicate<@Nullable T> expireIf;
@Nullable
private BiFunction<Throwable, @Nullable T, @Nullable CompletableFuture<T>> exceptionHandler;
@Nullable
private Backoff backoff;

AsyncLoaderBuilder(Function<@Nullable T, CompletableFuture<T>> loader) {
requireNonNull(loader, "loader");
Expand All @@ -61,10 +67,32 @@ public AsyncLoaderBuilder<T> expireIf(Predicate<@Nullable T> predicate) {
return this;
}

/**
* Handles exception thrown by loader and caches value.
* If exceptionHandler returns {@code null}, complete {@link AsyncLoader#get()} exceptionally.
*/
public AsyncLoaderBuilder<T> exceptionHandler(BiFunction<
Throwable, @Nullable T, @Nullable CompletableFuture<T>> exceptionHandler) {
requireNonNull(exceptionHandler, "exceptionHandler");
this.exceptionHandler = exceptionHandler;
return this;
}

/**
* Applies backoff with load failure attempts that incremented
* when loader or exception handler completes exceptionally.
* If {@link AsyncLoader#get()} success, load failure attempts is reset to zero.
*/
public AsyncLoaderBuilder<T> backoff(Backoff backoff) {
requireNonNull(backoff, "backoff");
this.backoff = backoff;
return this;
}

/**
* Returns a newly created {@link AsyncLoader} with the entries in this builder.
*/
public AsyncLoader<T> build() {
return new DefaultAsyncLoader<>(loader, expireAfterLoad, expireIf);
return new DefaultAsyncLoader<>(loader, expireAfterLoad, expireIf, exceptionHandler, backoff);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.retry.Backoff;
import com.linecorp.armeria.common.annotation.Nullable;

final class DefaultAsyncLoader<T> implements AsyncLoader<T> {
Expand All @@ -44,16 +49,28 @@ final class DefaultAsyncLoader<T> implements AsyncLoader<T> {
private final Duration expireAfterLoad;
@Nullable
private final Predicate<@Nullable T> expireIf;
@Nullable
private final BiFunction<Throwable, @Nullable T, @Nullable CompletableFuture<T>> exceptionHandler;
@Nullable
private final Backoff backoff;
private int loadFailureAttempts;
@Nullable
private ScheduledExecutorService scheduledExecutor;

private volatile CompletableFuture<CacheEntry<T>> loadFuture = UnmodifiableFuture.completedFuture(null);

DefaultAsyncLoader(Function<@Nullable T, CompletableFuture<T>> loader,
@Nullable Duration expireAfterLoad,
@Nullable Predicate<@Nullable T> expireIf) {
@Nullable Predicate<@Nullable T> expireIf,
@Nullable BiFunction<
Throwable, @Nullable T, @Nullable CompletableFuture<T>> exceptionHandler,
@Nullable Backoff backoff) {
requireNonNull(loader, "loader");
this.loader = loader;
this.expireAfterLoad = expireAfterLoad;
this.expireIf = expireIf;
this.exceptionHandler = exceptionHandler;
this.backoff = backoff;
}

@Override
Expand Down Expand Up @@ -84,27 +101,76 @@ private CompletableFuture<CacheEntry<T>> get0() {
}

final CompletableFuture<CacheEntry<T>> newLoadfuture = future;
final T cache = cacheEntry != null ? cacheEntry.loadVal : null;
try {
final T cache = cacheEntry != null ? cacheEntry.loadVal : null;
requireNonNull(loader.apply(cache), "loader.apply() returned null")
.handle((val, cause) -> {
if (cause != null) {
logger.warn("Failed to load a new value from loader: {}. the previous value: {}",
loader, cache, cause);
newLoadfuture.completeExceptionally(cause);
handleException(cause, cache, newLoadfuture);
} else {
newLoadfuture.complete(new CacheEntry<>(val));
completeSuccessfully(val, newLoadfuture);
}
return null;
});
} catch (Exception e) {
logger.warn("Unexpected exception from loader.apply()", e);
newLoadfuture.completeExceptionally(e);
handleException(e, cache, newLoadfuture);
}

return newLoadfuture;
}

private void handleException(Throwable originCause, @Nullable T cache,
CompletableFuture<CacheEntry<T>> future) {
if (exceptionHandler == null) {
completeExceptionally(originCause, future);
return;
}

try {
final CompletableFuture<T> handleException = exceptionHandler.apply(originCause, cache);
if (handleException != null) {
handleException.handle((val, cause) -> {
if (cause != null) {
logger.warn("Failed to load a new value from exceptionHandler: {}." +
"the previous value: {}", exceptionHandler, cache, cause);
completeExceptionally(cause, future);
} else {
completeSuccessfully(val, future);
}
return null;
});
} else {
completeExceptionally(originCause, future);
}
} catch (Exception e) {
logger.warn("Unexpected exception from exceptionHandler.apply()", e);
completeExceptionally(e, future);
}
}

private void completeSuccessfully(T val, CompletableFuture<CacheEntry<T>> future) {
if (backoff != null) {
loadFailureAttempts = 0;
}
future.complete(new CacheEntry<>(val));
}

private void completeExceptionally(Throwable cause, CompletableFuture<CacheEntry<T>> future) {
if (backoff != null) {
final int attempts = ++loadFailureAttempts;
if (scheduledExecutor == null) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
}
scheduledExecutor.schedule(() -> future.completeExceptionally(cause),
backoff.nextDelayMillis(attempts), TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(cause);
}
}

private boolean isValid(@Nullable CacheEntry<T> cacheEntry) {
if (cacheEntry == null) {
return false;
Expand Down
Loading

0 comments on commit e161fcc

Please sign in to comment.