Skip to content

Commit

Permalink
Add AsyncLoader to load and update value periodically
Browse files Browse the repository at this point in the history
Fixes #5506.
  • Loading branch information
injae-kim committed Apr 10, 2024
1 parent 4ca7e89 commit cf8056a
Show file tree
Hide file tree
Showing 5 changed files with 617 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.util;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import com.linecorp.armeria.common.annotation.Nullable;

/**
* An {@link AsyncLoader} to atomically load, cache and update value.
*/
@FunctionalInterface
public interface AsyncLoader<T> {

/**
* Returns a newly created {@link AsyncLoaderBuilder} with the specified loader.
*/
static <T> AsyncLoaderBuilder<T> builder(Function<@Nullable T, CompletableFuture<T>> loader) {
return new AsyncLoaderBuilder<>(loader);
}

/**
* Returns a {@link CompletableFuture} which emits loaded value.
* Loads new value by loader only if nothing is cached or loaded value is expired.
*/
CompletableFuture<T> get();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.util;

import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;

import com.linecorp.armeria.common.annotation.Nullable;

/**
* A builder for creating a new {@link AsyncLoader}.
*/
public final class AsyncLoaderBuilder<T> {

private final Function<@Nullable T, CompletableFuture<T>> loader;
@Nullable
private Duration expireAfterLoad;
@Nullable
private Predicate<@Nullable T> expireIf;

AsyncLoaderBuilder(Function<@Nullable T, CompletableFuture<T>> loader) {
requireNonNull(loader, "loader");
this.loader = loader;
}

/**
* Expires loaded value after duration since it was loaded.
* New value will be loaded by loader on next {@link AsyncLoader#get()}.
*/
public AsyncLoaderBuilder<T> expireAfterLoad(Duration duration) {
requireNonNull(duration, "duration");
this.expireAfterLoad = duration;
return this;
}

/**
* Expires loaded value if predicate matches.
* New value will be loaded by loader on next {@link AsyncLoader#get()}.
*/
public AsyncLoaderBuilder<T> expireIf(Predicate<@Nullable T> predicate) {
requireNonNull(predicate, "predicate");
this.expireIf = predicate;
return this;
}

/**
* Returns a newly created {@link AsyncLoader} with the entries in this builder.
*/
public AsyncLoader<T> build() {
return new DefaultAsyncLoader<>(loader, expireAfterLoad, expireIf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.util;

import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.annotation.Nullable;

final class DefaultAsyncLoader<T> implements AsyncLoader<T> {

private static final Logger logger = LoggerFactory.getLogger(DefaultAsyncLoader.class);

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<
DefaultAsyncLoader, CompletableFuture> loadFutureUpdater = AtomicReferenceFieldUpdater
.newUpdater(DefaultAsyncLoader.class, CompletableFuture.class, "loadFuture");

private final Function<@Nullable T, CompletableFuture<T>> loader;
@Nullable
private final Duration expireAfterLoad;
@Nullable
private final Predicate<@Nullable T> expireIf;

private volatile CompletableFuture<LoadEntry<T>> loadFuture = UnmodifiableFuture.completedFuture(null);

DefaultAsyncLoader(Function<@Nullable T, CompletableFuture<T>> loader,
@Nullable Duration expireAfterLoad,
@Nullable Predicate<@Nullable T> expireIf) {
requireNonNull(loader, "loader");
this.loader = loader;
this.expireAfterLoad = expireAfterLoad;
this.expireIf = expireIf;
}

@Override
public CompletableFuture<T> get() {
return get0().thenApply(f -> f.loadVal);
}

private CompletableFuture<LoadEntry<T>> get0() {
CompletableFuture<LoadEntry<T>> future;
LoadEntry<T> loadEntry = null;
for (;;) {
final CompletableFuture<LoadEntry<T>> loadFuture = this.loadFuture;
if (!loadFuture.isDone()) {
return loadFuture;
}

if (!loadFuture.isCompletedExceptionally()) {
loadEntry = loadFuture.join();
if (isValid(loadEntry)) {
return loadFuture;
}
}

future = new CompletableFuture<>();
if (loadFutureUpdater.compareAndSet(this, loadFuture, future)) {
break;
}
}

final CompletableFuture<LoadEntry<T>> newLoadfuture = future;
try {
requireNonNull(loader.apply(loadEntry != null ? loadEntry.loadVal : null),
"loader.apply() returned null")
.handle((val, cause) -> {
if (cause != null) {
newLoadfuture.completeExceptionally(cause);
} else {
newLoadfuture.complete(new LoadEntry<>(val));
}
return null;
});
} catch (Exception e) {
logger.warn("Unexpected exception from loader.apply()", e);
newLoadfuture.completeExceptionally(e);
}

return newLoadfuture;
}

private boolean isValid(@Nullable LoadEntry<T> loadEntry) {
if (loadEntry == null) {
return false;
}

if (expireAfterLoad != null) {
final Instant expiration = loadEntry.loadWhen.plusMillis(expireAfterLoad.toMillis());
if (Instant.now().isAfter(expiration)) {
return false;
}
}

if (expireIf != null && expireIf.test(loadEntry.loadVal)) {
return false;
}

return true;
}

private static class LoadEntry<T> {

private final T loadVal;
private final Instant loadWhen;

LoadEntry(T loadVal) {
requireNonNull(loadVal, "loadVal");
this.loadVal = loadVal;
this.loadWhen = Instant.now();
}
}
}
Loading

0 comments on commit cf8056a

Please sign in to comment.