Skip to content

Commit

Permalink
refactor: use LLCF#peek* method instead of `whenComplete/exceptiona…
Browse files Browse the repository at this point in the history
…lly`, more reliable codes 🦺

always report the potential uncaught exceptions, never hide exceptions and related bugs
  • Loading branch information
oldratlee committed Feb 1, 2025
1 parent 8c3cb18 commit 7226e74
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 34 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@
- 其它大型并发框架(比如[`Akka`](https://akka.io/)[`RxJava`](https://github.com/ReactiveX/RxJava))在使用上需要理解的内容要多很多。当然基本的并发关注方面及其复杂性,与具体使用哪个工具无关,都是要理解与注意的
- **高层抽象**
- 或说 以业务流程的形式表达技术的并发流程
- 可以避免或减少使用繁琐易错的基础并发协调工具[同步器`Synchronizers`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/package-summary.html#synchronizers-heading)(如[`CountDownLatch`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CountDownLatch.html)[`CyclicBarrier`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CyclicBarrier.html)[`Phaser`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Phaser.html))、[`Locks`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/locks/package-summary.html)[原子类`atomic`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/atomic/package-summary.html)
- 可以避免或减少使用繁琐易错的并发协调基础工具[同步器`Synchronizers`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/package-summary.html#synchronizers-heading)(如[`CountDownLatch`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CountDownLatch.html)[`CyclicBarrier`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CyclicBarrier.html)[`Phaser`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Phaser.html))、[`Locks`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/locks/package-summary.html)[原子类`atomic`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/atomic/package-summary.html)
- **`Java`标准库内置**
- 无需额外依赖,几乎总是可用
- 相信有极高的实现质量

和其它并发工具、框架一样,`CompletableFuture`用于
与其它并发工具、框架一样,`CompletableFuture`用于

- 并发执行业务逻辑,或说编排并发处理流程或异步任务
- 多核并行处理,充分利用资源
Expand Down Expand Up @@ -214,7 +214,7 @@
- 或是在传入的`CompletableFuture Action`中设置外部的变量,需要注意多线程读写的线程安全问题 ⚠️
- 多线程读写涉及多线程数据传递的复杂性,遗漏并发逻辑的数据读写的正确处理是业务代码中的常见问题❗️
- 并发深坑勿入,并发逻辑复杂易出Bug 🐞
如果涉及超时则会更复杂,`JDK CompletableFuture`自身在`Java 21`中也有这方面的[Bug修复](https://github.com/foldright/cffu/releases/tag/v1.0.1-Alpha20)
如果涉及超时则会更复杂,`JDK CompletableFuture`自身在`Java 21`中也有这方面的[Bug修复](https://github.com/foldright/cffu/releases/tag/v1.0.0-Alpha20)

`cffu``allResultsFailFastOf` / `allResultsOf` / `mostSuccessResultsOf`等方法提供了返回多个`CF`运行结果的功能。使用这些方法获取多个`CF`的整体运行结果:

Expand Down Expand Up @@ -514,7 +514,7 @@ public class MultipleActionsDemo {

应该只处理当前业务自己清楚明确能恢复的具体异常,由外层处理其它异常;从而避免掩盖Bug或是错误地处理了不能恢复的异常。

`cffu`提供了相应的[`catching*`等方法](https://foldright.io/api-docs/cffu/1.0.1/io/foldright/cffu/CompletableFutureUtils.html#catching(C,java.lang.Class,java.util.function.Function)),支持处理指定异常类型;使用方式与`CF#exceptionally`,不附代码示例。
`cffu`提供了相应的[`catching*`方法](https://foldright.io/api-docs/cffu/1.0.1/io/foldright/cffu/CompletableFutureUtils.html#catching(C,java.lang.Class,java.util.function.Function)),支持处理指定异常类型;相比`CF#exceptionally`方法新加了一个异常类型参数,使用方式类似,不附代码示例。

### 2.6 `Backport`支持`Java 8`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public final class CffuFactoryBuilder {
////////////////////////////////////////////////////////////////////////////////
// region# Internal constructor and fields
////////////////////////////////////////////////////////////////////////////////
/// /////////////////////////////////////////////////////////////////////////////

private final Executor defaultExecutor;

Expand Down Expand Up @@ -80,21 +80,23 @@ private static CffuFactory _poisonObject() {
// endregion
////////////////////////////////////////////////////////////////////////////////
// region# Internal helper methods and fields
////////////////////////////////////////////////////////////////////////////////

/// /////////////////////////////////////////////////////////////////////////////

@Contract(pure = true)
static CffuFactory withDefaultExecutor(CffuFactory fac, Executor defaultExecutor) {
return new CffuFactory(makeExecutor(defaultExecutor), fac.forbidObtrudeMethods());
}

private static Executor makeExecutor(final Executor defaultExecutor) {
requireNonNull(defaultExecutor, "defaultExecutor is null");
// check CffuMadeExecutor interface to avoid re-wrapping.
if (defaultExecutor instanceof CffuMadeExecutor) return defaultExecutor;
requireNonNull(defaultExecutor, "defaultExecutor is null");

// because wraps the input executor below, MUST call `screenExecutor` translation beforehand;
// otherwise the sequent operations can NOT recognize the input executor.
final Executor screenExecutor = LLCF.screenExecutor(defaultExecutor);

final Executor wrapByProviders = wrapExecutorByProviders(screenExecutor);
return wrapMadeInterface(wrapByProviders);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2975,6 +2975,8 @@ C catchingAsync(C cfThis, Class<X> exceptionType, Function<? super X, ? extends
* @param exceptionType the exception type that triggers use of {@code fallback}. The exception type is matched against
* the exception from argument cfThis. To avoid hiding bugs and other unrecoverable errors,
* callers should prefer more specific types, avoiding {@code Throwable.class} in particular.
* @param fallback the Function to be called if cfThis fails with the expected exception type.
* The function's argument is the exception from cfThis.
* @param executor the executor to use for asynchronous execution
* @see Futures#catching the equivalent Guava method catching()
*/
Expand Down Expand Up @@ -3136,7 +3138,7 @@ public static <C extends CompletableFuture<?>> C orTimeout(C cfThis, long timeou
// below code is copied from CompletableFuture#orTimeout with small adoption
if (!cfThis.isDone()) {
ScheduledFuture<?> f = Delayer.delayToTimeoutCf(cfThis, timeout, unit);
cfThis.whenComplete(new FutureCanceller(f));
peek0(cfThis, new FutureCanceller(f), "CFU#orTimeout");
}
}
return cfThis;
Expand Down Expand Up @@ -3226,7 +3228,7 @@ C completeOnTimeout(C cfThis, @Nullable T value, long timeout, TimeUnit unit) {
// below code is copied from CompletableFuture#completeOnTimeout with small adoption
if (!cfThis.isDone()) {
ScheduledFuture<?> f = Delayer.delayToCompleteCf(cfThis, value, timeout, unit);
cfThis.whenComplete(new FutureCanceller(f));
peek0(cfThis, new FutureCanceller(f), "CFU#completeOnTimeout");
}
}
return cfThis;
Expand All @@ -3239,7 +3241,7 @@ private static <C extends CompletableFuture<?>> C hopExecutorIfAtCfDelayerThread
peek0(cf, (v, ex) -> {
if (!atCfDelayerThread()) completeCf0(ret, v, ex);
else screenExecutor(executor).execute(() -> completeCf0(ret, v, ex));
}, "handle of executor hop");
}, "CFU#hopExecutorIfAtCfDelayerThread");

return (C) ret;
}
Expand Down Expand Up @@ -3438,7 +3440,7 @@ C peek(C cfThis, BiConsumer<? super T, ? super Throwable> action) {
requireNonNull(cfThis, "cfThis is null");
requireNonNull(action, "action is null");

return peek0(cfThis, action, "the action of peek");
return peek0(cfThis, action, "CFU#peek");
}

/**
Expand Down Expand Up @@ -3498,7 +3500,7 @@ C peekAsync(C cfThis, BiConsumer<? super T, ? super Throwable> action, Executor
requireNonNull(action, "action is null");
requireNonNull(executor, "executor is null");

return peekAsync0(cfThis, action, "the action of peekAsync", executor);
return peekAsync0(cfThis, action, "CFU#peekAsync", executor);
}

// endregion
Expand Down
2 changes: 2 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/LLCF.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public static boolean isMinStageCf(CompletableFuture<?> cf) {
* The uncaught exceptions thrown by the action are reported.
*
* @see CompletableFutureUtils#peek(CompletionStage, BiConsumer)
* @see <a href="https://peps.python.org/pep-0020/">Errors should never pass silently. Unless explicitly silenced.</a>
*/
@Contract("_, _, _ -> param1")
public static <T, C extends CompletionStage<? extends T>>
Expand All @@ -183,6 +184,7 @@ C peek0(C cfThis, BiConsumer<? super T, ? super Throwable> action, String where)
* returns the given stage. The uncaught exceptions thrown by the action are reported.
*
* @see CompletableFutureUtils#peekAsync(CompletionStage, BiConsumer, Executor)
* @see <a href="https://peps.python.org/pep-0020/">Errors should never pass silently. Unless explicitly silenced.</a>
*/
@Contract("_, _, _, _ -> param1")
public static <T, C extends CompletionStage<? extends T>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import edu.umd.cs.findbugs.annotations.Nullable;
import io.foldright.cffu.Cffu;
import io.foldright.cffu.LLCF;
import io.foldright.cffu.internal.CommonUtils;
import io.foldright.cffu.internal.ExceptionLogger;
import org.jetbrains.annotations.Contract;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static io.foldright.cffu.CompletableFutureUtils.unwrapCfException;
import static io.foldright.cffu.LLCF.completeCf0;
import static io.foldright.cffu.LLCF.peek0;
import static io.foldright.cffu.internal.CommonUtils.requireArrayAndEleNonNull;
import static io.foldright.cffu.internal.ExceptionLogger.Level.ERROR;
import static io.foldright.cffu.internal.ExceptionLogger.Level.WARN;
import static io.foldright.cffu.internal.ExceptionLogger.logException;
import static io.foldright.cffu.internal.ExceptionLogger.logUncaughtException;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -66,9 +67,10 @@ public static void handleAllSwallowedExceptions(

for (int i = 0; i < inputs.length; i++) {
final int idx = i;
// argument ex of method `exceptionally` is never null
inputs[idx].exceptionally(ex -> safeHandle(new ExceptionInfo(
where, idx, ex, safeGet(attachments, idx)), exceptionHandler));
peek0(inputs[i], (v, ex) -> {
if (ex == null) return;
safeHandle(new ExceptionInfo(where, idx, ex, safeGet(attachments, idx)), exceptionHandler);
}, "handleAllSwallowedExceptions");
}
}

Expand Down Expand Up @@ -117,18 +119,18 @@ public static void handleSwallowedExceptions(

// whether to swallow exceptions from inputs depends on the output's result,
// so must check when the output CompletionStage completes.
output.whenComplete((v, outputEx) -> { // outputEx may be null
peek0(output, (v, outputEx) -> { // outputEx may be null
Throwable outputBizEx = unwrapCfException(outputEx);
for (int i = 0; i < unreferencedInputs.length; i++) {
final int idx = i;
// argument ex of method `exceptionally` is never null
unreferencedInputs[i].exceptionally(ex -> {
peek0(unreferencedInputs[i], (v1, ex) -> {
if (ex == null) return;
// if ex is returned to output cf(aka. not swallowed ex), do NOTHING
if (unwrapCfException(ex) == outputBizEx) return null;
return safeHandle(new ExceptionInfo(where, idx, ex, safeGet(attachments, idx)), exceptionHandler);
});
if (unwrapCfException(ex) == outputBizEx) return;
safeHandle(new ExceptionInfo(where, idx, ex, safeGet(attachments, idx)), exceptionHandler);
}, "handleSwallowedExceptions(handle the input cf)");
}
});
}, "handleSwallowedExceptions(handle the output cf)");
}

/**
Expand All @@ -141,7 +143,7 @@ public static ExceptionHandler cffuSwallowedExceptionHandler() {

private static final ExceptionHandler CFFU_SWALLOWED_EX_HANDLER = exInfo -> {
String msg = "Swallowed exception of cf" + (exInfo.index + 1) + " at " + exInfo.where;
logException(ExceptionLogger.Level.WARN, msg, exInfo.exception);
logException(WARN, msg, exInfo.exception);
};

/**
Expand All @@ -151,7 +153,7 @@ public static ExceptionHandler cffuSwallowedExceptionHandler() {
private static CompletionStage<Void>[] unreferenced(CompletionStage<?>[] css) {
return CommonUtils.mapArray(css, CompletionStage[]::new, s -> {
CompletableFuture<Void> ret = new CompletableFuture<>();
LLCF.peek0(s, (v, ex) -> LLCF.completeCf0(ret, null, ex), "unreferenced");
peek0(s, (v, ex) -> completeCf0(ret, null, ex), "unreferenced");
return ret;
});
}
Expand All @@ -162,15 +164,12 @@ private static CompletionStage<Void>[] unreferenced(CompletionStage<?>[] css) {
else return null;
}

@Contract("_, _ -> null")
@SuppressWarnings("SameReturnValue")
private static <T> @Nullable T safeHandle(ExceptionInfo info, ExceptionHandler handler) {
private static void safeHandle(ExceptionInfo info, ExceptionHandler handler) {
try {
handler.handle(info);
} catch (Throwable e) {
logUncaughtException(ExceptionLogger.Level.ERROR, "exceptionHandler(" + handler.getClass() + ")", e);
logUncaughtException(ERROR, "exceptionHandler(" + handler.getClass() + ")", e);
}
return null;
}

private SwallowedExceptionHandleUtils() {
Expand Down
2 changes: 1 addition & 1 deletion docs/cf-functions-intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
说明:

- 虽然这2个方法是静态工厂方法,但并不是`CF`链的起点,而是输入多个`CF`,用于编排多路的流程。
- 在功能与使用的上,应该和下面【3. 流程编排】一节的方法归类在一起。
- 在功能与使用的上,应该与下面【3. 流程编排】一节的方法归类在一起。
- 这里列上,只是为了体现出是静态工厂方法这个特点。
- 这2个方法是在组合输入的多个`CF`的结果,本身复杂业务执行逻辑,逻辑简单无阻塞,所以无需`Executor`
- 这2个方法所返回的`CF`,在结果获取上,有不方便的地方: 😔
Expand Down
2 changes: 1 addition & 1 deletion docs/completable-future-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
- 对于「完成」状态,进一步可以分成 成功(`Success`)、失败(`Failed`)2种状态。
- 所以也可以说,任务状态有且只有 运行中、取消、成功、失败 这4种状态。
- 右图是任务的状态及其转变图。
- 在概念上`CF`的状态转变只能是单次单向的,这很简单可靠、也容易理解并和使用直觉一致
- 在概念上`CF`的状态转变只能是单次单向的,这很简单可靠、也容易理解,并与使用直觉一致
- > 注:虽然下文提到的`obtrudeValue()`/`obtrudeException`方法可以突破`CF`概念上的约定,但这2个后门方法在正常设计实现中不应该会用到,尤其在业务使用应该完全忽略;带来的问题也由使用者自己了解清楚并注意。
- 〚2〛 关于「取消」状态:
- 对于`CompletableFuture`,取消的实现方式是设置[`CancellationException`](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CancellationException.html)异常。
Expand Down

0 comments on commit 7226e74

Please sign in to comment.