Skip to content

Commit

Permalink
More tests, refactoring and javadocs, short-circuit done futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Pavlov authored and Konstantin Pavlov committed Jul 15, 2024
1 parent fdf0257 commit f220fc1
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 103 deletions.
34 changes: 0 additions & 34 deletions .github/workflows/maven-publish.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn -B package
run: mvn -B verify javadoc:jar source:jar

# Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive
- name: Update dependency graph
Expand Down
194 changes: 158 additions & 36 deletions src/main/java/me/kpavlov/await4j/Async.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,40 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

/**
* The {@code Async} class provides utilities to execute code asynchronously using virtual threads.
* It allows running {@link ThrowingRunnable} and {@link Callable} tasks asynchronously, handling exceptions,
* and returning results in a synchronous manner. It leverages the new virtual threads feature introduced in Java
* to provide lightweight concurrency.
* <p>
* This class offers methods to:
* <ul>
* <li>Run a block of code asynchronously and wait for its completion.</li>
* <li>Handle both checked and unchecked exceptions in asynchronous tasks.</li>
* <li>Retrieve results from {@link Future} and {@link CompletableFuture} objects.</li>
* </ul>
*/
public class Async {

private static final Thread.Builder virtualThreadBuilder = Thread.ofVirtual()
.name("async-virtual-", 0);

public static void await(ThrowingRunnable block) {
private Async() {
// hide public constructor
}

/**
* Executes a block of code asynchronously and waits for its completion.
*
* @param block The code to be executed asynchronously
* @param millis The maximum time to wait for the block to complete, in milliseconds
* @throws CompletionException if the virtual thread is interrupted
* @throws RuntimeException if the block throws an exception
* @throws Error if the block throws an Error
* @throws IllegalStateException if an unexpected Throwable is encountered
*/
@SuppressWarnings("java:S1181")
public static void await(ThrowingRunnable block, long millis) {
Objects.requireNonNull(block, "Block should not be null");
try {
final var failureHolder = new AtomicReference<Throwable>();
Expand All @@ -18,44 +46,71 @@ public static void await(ThrowingRunnable block) {
ThrowingRunnable
.toRunnable(block)
.run();
} catch (RuntimeException | Error e) {
} catch (Error e) {
failureHolder.set(e);
} catch (Exception e) {
failureHolder.set(toRuntimeException(e));
}
})
.join();
.join(millis);
final Throwable throwable = failureHolder.get();
if (throwable instanceof Error e) {
throw e;
} else if (throwable instanceof RuntimeException re) {
throw re;
} else {
throw new IllegalStateException("Unexpected Throwable: " + throwable, throwable);
switch (throwable) {
case null -> {
// success
}
case Error e -> throw e;
case RuntimeException re -> throw re;
default -> throw new IllegalStateException("Unexpected Throwable: " + throwable, throwable);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted virtual thread", e);
throw new CompletionException("Interrupted virtual thread", e);
}
}

public static <T> T await(Callable<T> block) throws RuntimeException {
/**
* Executes a block of code asynchronously and waits indefinitely for its completion.
*
* @param block The code to be executed asynchronously
* @throws CompletionException if the virtual thread is interrupted
* @throws RuntimeException if the block throws an exception
* @throws Error if the block throws an Error
* @throws IllegalStateException if an unexpected Throwable is encountered
*/
public static void await(ThrowingRunnable block) {
await(block, 0);
}

/**
* Executes a callable block asynchronously and returns its result.
*
* @param <T> The type of the result
* @param block The callable block to be executed asynchronously. <strong>Block should not execute code
* that contains synchronized blocks or invokes synchronized methods to avoid scalability issues.</strong>
* @param millis The maximum time to wait for the callable block to complete, in milliseconds
* @return The result of the callable block
* @throws RuntimeException if the virtual thread is interrupted or if the block throws an exception
* @throws Error if the block throws an Error
* @throws IllegalStateException if an unexpected throwable is encountered in the call result
*/
public static <T> T await(Callable<T> block, long millis) {
Objects.requireNonNull(block, "Callable should not be null");
try {
final var resultHolder = new AtomicReference<Result<T>>();
virtualThreadBuilder.start(() -> {
final Result<T> result = callWithErrorHandling(block);
resultHolder.set(result);
}).join();
}).join(millis);
final Result<T> result = resultHolder.get();
if (result.isSuccess()) {
return result.getOrThrow();
return result.getOrNull();
} else {
final Throwable failure = result.failure();
if (failure instanceof RuntimeException re) {
throw re;
} else if (failure instanceof Error e) {
throw e;
} else {
throw new IllegalStateException("Unexpected throwable in call Result:" + failure, failure);
switch (failure) {
case RuntimeException re -> throw re;
case Error e -> throw e;
case null, default ->
throw new IllegalStateException("Unexpected throwable in call Result:" + failure, failure);
}
}
} catch (InterruptedException e) {
Expand All @@ -64,6 +119,62 @@ public static <T> T await(Callable<T> block) throws RuntimeException {
}
}

/**
* Executes a callable block asynchronously and returns its result.
*
* @param <T> The type of the result
* @param block The callable block to be executed asynchronously
* @return The result of the callable block
* @throws RuntimeException if the virtual thread is interrupted or if the block throws an exception
* @throws Error if the block throws an Error
* @throws IllegalStateException if an unexpected throwable is encountered in the call result
*/
public static <T> T await(Callable<T> block) {
return await(block, 0);
}

/**
* Waits for the completion of a Future and returns its result.
*
* @param <T> The type of the result
* @param future The Future to await
* @return The result of the Future
* @throws RuntimeException if the Future completes exceptionally
*/
public static <T> T await(Future<T> future) {
if (shortCircuitDoneFuture(future)) return future.resultNow();
return await(() -> future.get());
}


/**
* Waits for the completion of a Future and returns its result.
*
* @param <T> The type of the result
* @param future The Future to await
* @param millis The maximum time to wait for the future to complete, in milliseconds
* @return The result of the Future
* @throws RuntimeException if the Future completes exceptionally
*/
public static <T> T await(Future<T> future, long millis) {
if (shortCircuitDoneFuture(future)) return future.resultNow();
return await(() -> future.get(millis, TimeUnit.MILLISECONDS));
}

/**
* Waits for the completion of a CompletableFuture and returns its result.
*
* @param <T> The type of the result
* @param completableFuture The CompletableFuture to await
* @return The result of the CompletableFuture
* @throws RuntimeException if the CompletableFuture completes exceptionally
*/
public static <T> T await(CompletableFuture<T> completableFuture) {
if (shortCircuitDoneFuture(completableFuture)) return completableFuture.resultNow();
return await(completableFuture::join);
}


/**
* Executes a block of code with error handling.
* <p>
Expand All @@ -79,38 +190,49 @@ public static <T> T await(Callable<T> block) throws RuntimeException {
* @throws RuntimeException if an {@link InterruptedException},
* {@link ExecutionException}, or any other exception occurs
*/
@SuppressWarnings("java:S1181")
private static <T> Result<T> callWithErrorHandling(Callable<T> block) {
try {
return Result.success(block.call());
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
return Result.failure(
new RuntimeException("Can't execute async task: interrupted", e)
new CompletionException("Can't execute async task: interrupted", e)
);
} catch (ExecutionException | CompletionException e) {
} catch (Error e) {
return Result.failure(e);
} catch (CompletionException | ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof RuntimeException re) {
// re-throw RuntimeException as it is
return Result.failure(re);
} else if (cause instanceof Error error) {
// re-throw Error as it is
return Result.failure(error);
if (cause instanceof Error) {
return Result.failure(cause);
} else {
return Result.failure(new RuntimeException("Can't execute async task: exception", cause));
return Result.failure(toRuntimeException(cause));
}
} catch (RuntimeException | Error e) {
return Result.failure(e);
} catch (Exception e) {
return Result.failure(new RuntimeException("Can't execute async task: exception", e));
} catch (Throwable e) {
return Result.failure(toRuntimeException(e));
}
}

public static <T> T await(Future<T> future) {
return await(() -> future.get());
private static <T> boolean shortCircuitDoneFuture(Future<T> future) {
if (future.state() == Future.State.SUCCESS) {
return true;
} else if (future.state() == Future.State.FAILED) {
Throwable throwable = future.exceptionNow();
if (throwable instanceof Error e) {
throw e;
}
throw toRuntimeException(throwable);
}
return false;
}

public static <T> T await(CompletableFuture<T> completableFuture) {
return await(completableFuture::join);
private static RuntimeException toRuntimeException(Throwable cause) {
if (cause instanceof RuntimeException re) {
// re-throw RuntimeException as it is
return re;
} else {
return new CompletionException("Can't execute async task: exception", cause);
}
}

}
11 changes: 10 additions & 1 deletion src/main/java/me/kpavlov/await4j/Result.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package me.kpavlov.await4j;

import java.util.function.Function;
import java.util.function.UnaryOperator;

/**
* Represents the result of an operation that can either succeed with a value or fail with a throwable.
Expand Down Expand Up @@ -83,7 +84,15 @@ public <R> Result<R> map(Function<T, R> function) {
return success(function.apply(value));
}

public <R> Result<R> mapThrowable(Function<Throwable, Throwable> function) {
/**
* Maps the failure throwable using the provided function.
*
* @param <R> the type parameter for the new Result
* @param function the mapping function for the throwable
* @return a new Result instance with the mapped throwable
* @throws IllegalStateException if this instance represents success
*/
public <R> Result<R> mapThrowable(UnaryOperator<Throwable> function) {
if (throwable == null) {
throw new IllegalStateException("Can't map empty Throwable. Use map if needed.");
}
Expand Down
19 changes: 13 additions & 6 deletions src/test/java/me/kpavlov/await4j/AsyncCallableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class AsyncCallableTest extends AbstractAsyncTest {

static Object[][] awaitCallableToCompleteSuccessfully() {
return TestUtils.combine(threadBuilders(), "OK", null);
}

@ParameterizedTest
@MethodSource("threadBuilders")
void awaitCallableToCompleteSuccessfully(Thread.Builder threadBuilder) throws InterruptedException {
@MethodSource("awaitCallableToCompleteSuccessfully")
void awaitCallableToCompleteSuccessfully(final Thread.Builder threadBuilder,
final String expectedResult) throws InterruptedException {
final var completed = new AtomicBoolean();
threadBuilder.start(() -> {
final var originalThread = Thread.currentThread();
Expand All @@ -27,17 +33,18 @@ void awaitCallableToCompleteSuccessfully(Thread.Builder threadBuilder) throws In
checkVirtualThreadInvariants(originalThread, threadLocal);
completed.compareAndSet(false, true);
// return result
return "Supplier Completed";
return expectedResult;
});
// then
assertThat(result).isEqualTo("Supplier Completed");
assertThat(result).isEqualTo(expectedResult);
}).join();
assertThat(completed).isTrue();
}


@ParameterizedTest
@MethodSource("threadBuilders")
void awaitSupplierReThrowsRuntimeException(Thread.Builder threadBuilder) throws InterruptedException {
void awaitSupplierReThrowsRuntimeException(final Thread.Builder threadBuilder) throws InterruptedException {
// given
final var runtimeException = new RuntimeException("Failure");
final Callable<String> callable = () -> {
Expand All @@ -60,7 +67,7 @@ void awaitHandlesThrowable(Exception throwable) {
Assertions.fail("Expected to fail with exception: %s", (Object) throwable);
} catch (Exception e) {
assertThat(e)
.isInstanceOf(RuntimeException.class)
.isInstanceOf(CompletionException.class)
.hasCause(throwable);
}
}
Expand Down
Loading

0 comments on commit f220fc1

Please sign in to comment.