Skip to content

Commit

Permalink
Add AsyncLoader to load and update value periodically (#5590)
Browse files Browse the repository at this point in the history
Fixes #5506.

### Motivation:
`AsyncLoader`  can be useful in the following situations.

- When it is necessary to periodically read and update information from
a file such as resolv.conf .
- When data is not valid after a certain period of time, such as an
OAuth 2.0 access token.

We already have an implementation for that on
[AbstractOAuth2AuthorizationGrant.java](https://github.com/line/armeria/compare/main...injae-kim:async-loader?expand=1#diff-866e837e1f45041da819b2fed992548e97c9b2a8a72071e68c374f9d4678e53a).
However, I hope to generalize it and add new features to use it in
various cases.

### Modifications:

- Add `AsyncLoader` to load and update value periodically

### Result:

- Closes #5506.
- Now user can load, cache, update value periodically by using
`AsyncLoader`

---------

Co-authored-by: Ikhun Um <[email protected]>
  • Loading branch information
injae-kim and ikhoon authored Aug 2, 2024
1 parent d7d5e0b commit 1145d45
Show file tree
Hide file tree
Showing 8 changed files with 1,269 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.util;

import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* A loader which atomically loads, caches and updates value.
*
* <p>Example usage:
* <pre>{@code
* WebClient client = WebClient.of("https://example.com");
* Function<String, CompletableFuture<String>> loader = cache -> {
* // Fetch new data from the remote server.
* ResponseEntity<String> response =
* client.prepare().get("/api/v1/items").asString().execute();
* return response.thenApply(res -> res.content());
* };
*
* AsyncLoader<String> asyncLoader =
* AsyncLoader
* .builder(loader)
* // Expire the loaded value after 60 seconds.
* .expireAfterLoad(Duration.ofSeconds(60))
* .build();
*
* // Fetch the value. This will call the loader function because the cache is empty.
* String value1 = asyncLoader.load().join();
* System.out.println("Loaded value: " + value1);
*
* // This will return the cached value because it's not expired yet.
* String value2 = asyncLoader.load().join();
* assert value1 == value2;
*
* // Wait for more than 60 seconds so that the cache is expired.
* Thread.sleep(61000);
*
* // Fetch the value again. This will call the loader function because the cache has expired.
* String value3 = asyncLoader.load().join();
* assert value1 != value3;
* }</pre>
*/
@FunctionalInterface
@UnstableApi
public interface AsyncLoader<T> {

/**
* Returns a newly created {@link AsyncLoaderBuilder} with the specified loader.
*
* @param loader function to load value. {@code T} is the previously cached value.
*/
static <T> AsyncLoaderBuilder<T> builder(
Function<@Nullable ? super T, ? extends CompletableFuture<? extends T>> loader) {
//noinspection unchecked
return new AsyncLoaderBuilder<>((Function<T, CompletableFuture<T>>) loader);
}

/**
* Returns a {@link CompletableFuture} which will be completed with the loaded value.
* A new value is fetched by the loader only if nothing is cached or the cache value has expired.
*/
CompletableFuture<T> load();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.util;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* A builder for creating a new {@link AsyncLoader}.
*/
@UnstableApi
public final class AsyncLoaderBuilder<T> {

private final Function<@Nullable T, CompletableFuture<T>> loader;
@Nullable
private Duration expireAfterLoad;
@Nullable
private Predicate<? super T> expireIf;
@Nullable
private Predicate<? super T> refreshIf;
@Nullable
private BiFunction<? super Throwable, ? super @Nullable T,
? extends @Nullable CompletableFuture<T>> exceptionHandler;

AsyncLoaderBuilder(Function<@Nullable T, CompletableFuture<T>> loader) {
requireNonNull(loader, "loader");
this.loader = loader;
}

/**
* Expires the loaded value after the given duration since it was loaded.
* New value will be loaded by the loader function on next {@link AsyncLoader#load()}.
*/
public AsyncLoaderBuilder<T> expireAfterLoad(Duration expireAfterLoad) {
requireNonNull(expireAfterLoad, "expireAfterLoad");
checkState(!expireAfterLoad.isNegative(), "expireAfterLoad: %s (expected: >= 0)", expireAfterLoad);
this.expireAfterLoad = expireAfterLoad;
return this;
}

/**
* Expires the loaded value after the given milliseconds since it was loaded.
* New value will be loaded by the loader function on next {@link AsyncLoader#load()}.
*/
public AsyncLoaderBuilder<T> expireAfterLoadMillis(long expireAfterLoadMillis) {
checkState(expireAfterLoadMillis >= 0,
"expireAfterLoadMillis: %s (expected: >= 0)", expireAfterLoadMillis);
expireAfterLoad = Duration.ofMillis(expireAfterLoadMillis);
return this;
}

/**
* Expires the loaded value if the predicate matches.
* New value will be loaded by the loader function on next {@link AsyncLoader#load()}.
*/
public AsyncLoaderBuilder<T> expireIf(Predicate<? super T> expireIf) {
requireNonNull(expireIf, "expireIf");
this.expireIf = expireIf;
return this;
}

/**
* Asynchronously refreshes the loaded value which has not yet expired if the predicate matches.
* This pre-fetch strategy can remove an additional loading time on a cache miss.
*/
public AsyncLoaderBuilder<T> refreshIf(Predicate<? super T> refreshIf) {
requireNonNull(refreshIf, "refreshIf");
this.refreshIf = refreshIf;
return this;
}

/**
* Handles the exception thrown by the loader function.
* If the exception handler returns {@code null}, {@link AsyncLoader#load()} completes exceptionally.
*/
public AsyncLoaderBuilder<T> exceptionHandler(BiFunction<? super Throwable, ? super @Nullable T,
? extends @Nullable CompletableFuture<T>> exceptionHandler) {
requireNonNull(exceptionHandler, "exceptionHandler");
this.exceptionHandler = exceptionHandler;
return this;
}

/**
* Returns a newly created {@link AsyncLoader} with the entries in this builder.
*/
public AsyncLoader<T> build() {
return new DefaultAsyncLoader<>(loader, expireAfterLoad, expireIf, refreshIf, exceptionHandler);
}
}
Loading

0 comments on commit 1145d45

Please sign in to comment.