From e875f15ccefa4c2cfb2352eca1786c49168f3f2a Mon Sep 17 00:00:00 2001 From: Attila Kelemen Date: Sat, 7 Oct 2017 20:19:30 +0200 Subject: [PATCH] Added the possibility to do something else in case of a dependency failure during graph execution. --- .../java/org/jtrim2/testutils/TestUtils.java | 6 ++ .../taskgraph/DependencyErrorHandler.java | 40 +++++++++ .../jtrim2/taskgraph/TaskNodeProperties.java | 33 ++++++++ .../basic/RestrictableTaskGraphExecutor.java | 2 +- .../org/jtrim2/taskgraph/basic/TaskNode.java | 62 ++++++++++++++ .../taskgraph/AbstractGraphExecutorTest.java | 84 +++++++++++++++++-- .../jtrim2/taskgraph/basic/TaskNodeTest.java | 82 +++++++++++++++++- 7 files changed, 300 insertions(+), 9 deletions(-) create mode 100644 subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/DependencyErrorHandler.java diff --git a/subprojects-internal/test-jtrim-utils/src/main/java/org/jtrim2/testutils/TestUtils.java b/subprojects-internal/test-jtrim-utils/src/main/java/org/jtrim2/testutils/TestUtils.java index 30b8a0ee..0bb75dd6 100644 --- a/subprojects-internal/test-jtrim-utils/src/main/java/org/jtrim2/testutils/TestUtils.java +++ b/subprojects-internal/test-jtrim-utils/src/main/java/org/jtrim2/testutils/TestUtils.java @@ -3,9 +3,15 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; +import java.util.function.Consumer; import org.jtrim2.concurrent.AsyncTasks; public final class TestUtils { + public static T build(T obj, Consumer config) { + config.accept(obj); + return obj; + } + public static void testUtilityClass(Class type) { if (!Modifier.isFinal(type.getModifiers())) { throw new AssertionError("Utility class must be final: " + type.getName()); diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/DependencyErrorHandler.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/DependencyErrorHandler.java new file mode 100644 index 00000000..e7ec487f --- /dev/null +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/DependencyErrorHandler.java @@ -0,0 +1,40 @@ +package org.jtrim2.taskgraph; + +import org.jtrim2.cancel.CancellationToken; + +/** + * Defines an error handler to be called instead of the associated node computation when + * the node is not being executed due to a failed dependency. + * + *

Thread safety

+ * Instances must have the same thread-safety property as the computation of the associated + * task node. + * + *

Synchronization transparency

+ * Instances are not expected to be synchronization transparent and are called in the + * same context as the associated computation would have been. + * + * @see TaskNodeProperties#tryGetDependencyErrorHandler() + */ +public interface DependencyErrorHandler { + /** + * Called when the associated node cannot be executed due to a dependency failure. This handler is + * called on the same executor the associated node's computation would have been called on. + *

+ * The graph execution is not considered completed before this method returns. + * + * @param cancelToken the cancellation token signaling the cancellation of the associated + * task graph execution. This argument cannot be {@code null}. + * @param nodeKey the {@code TaskNodeKey} identifying the node associated with the failure. + * This argument cannot be {@code null}. + * @param error the error causing the failure of the node execution. This argument cannot be + * {@code null}. + * + * @throws Exception thrown in case of some serious failure. The thrown exception will be + * suppressed by the error causing this method to be called. + */ + public void handleDependencyError( + CancellationToken cancelToken, + TaskNodeKey nodeKey, + Throwable error) throws Exception; +} diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskNodeProperties.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskNodeProperties.java index e5859c16..6862bdd5 100644 --- a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskNodeProperties.java +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/TaskNodeProperties.java @@ -25,6 +25,7 @@ */ public class TaskNodeProperties { private final TaskExecutor executor; + private final DependencyErrorHandler dependencyErrorHandler; /** * Sets the properties of the {@code TaskNodeProperties} from the current @@ -35,6 +36,7 @@ public class TaskNodeProperties { */ protected TaskNodeProperties(Builder builder) { this.executor = builder.executor; + this.dependencyErrorHandler = builder.dependencyErrorHandler; } /** @@ -49,6 +51,19 @@ public final TaskExecutor getExecutor() { return executor; } + /** + * Returns the handler to be called when the associated task node cannot due to + * a failure in one of its dependencies. The handler is called in the same context + * as the computation of the task node would have been. + * + * @return the handler to be called when the associated task node cannot due to + * a failure in one of its dependencies, or {@code null} if there is nothing to + * do with the failure. + */ + public DependencyErrorHandler tryGetDependencyErrorHandler() { + return dependencyErrorHandler; + } + /** * The {@code Builder} used to create {@link TaskNodeProperties} instances. * @@ -60,6 +75,7 @@ public final TaskExecutor getExecutor() { */ public static class Builder { private TaskExecutor executor; + private DependencyErrorHandler dependencyErrorHandler; /** * Initializes the {@code Builder} with the default values: @@ -69,6 +85,7 @@ public static class Builder { */ public Builder() { this.executor = SyncTaskExecutor.getSimpleExecutor(); + this.dependencyErrorHandler = null; } /** @@ -81,6 +98,22 @@ public Builder() { */ public Builder(TaskNodeProperties defaults) { this.executor = defaults.getExecutor(); + this.dependencyErrorHandler = defaults.tryGetDependencyErrorHandler(); + } + + /** + * Sets an error handler to be called if the associated node could not be + * executed due to a dependency error. The handler is called in the same context + * as the computation of the task node would have been. + *

+ * Setting this property will override any previously set value for this property. + * + * @param dependencyErrorHandler the error handler to be called if the associated node could not be + * executed due to a dependency error. This argument can be {@code null} if there is nothing + * to do in case of a dependency error. + */ + public void setDependencyErrorHandler(DependencyErrorHandler dependencyErrorHandler) { + this.dependencyErrorHandler = dependencyErrorHandler; } /** diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java index 01b53720..82e167af 100644 --- a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/RestrictableTaskGraphExecutor.java @@ -221,7 +221,7 @@ private void finishForwardNodes(TaskNodeKey key, Throwable error) { try { TaskNode child = nodes.get(childKey); if (child != null) { - child.propagateFailure(error); + child.propagateDependencyFailure(getCancelToken(), error); } } catch (Throwable ex) { onError(key, ex); diff --git a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/TaskNode.java b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/TaskNode.java index f1a719fe..6005a2b1 100644 --- a/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/TaskNode.java +++ b/subprojects/jtrim-task-graph/src/main/java/org/jtrim2/taskgraph/basic/TaskNode.java @@ -10,6 +10,7 @@ import org.jtrim2.cancel.OperationCanceledException; import org.jtrim2.concurrent.AsyncTasks; import org.jtrim2.executor.TaskExecutor; +import org.jtrim2.taskgraph.DependencyErrorHandler; import org.jtrim2.taskgraph.TaskErrorHandler; import org.jtrim2.taskgraph.TaskNodeKey; import org.jtrim2.utils.ExceptionHelper; @@ -187,6 +188,67 @@ public void cancel() { propagateFailure(OperationCanceledException.withoutStackTrace()); } + /** + * Completes this task node exceptionally but calling the + * {@link org.jtrim2.taskgraph.TaskNodeProperties dependency error handler} first (if there is any). + * If this task node was already scheduled for execution normally, this method does nothing. + *

+ * Note that cancellation affects the dependency error handler as well. That is, if execution + * was canceled, the dependency error handler might not get executed. + *

+ * Calling this method does not count as scheduled for the {@link #wasScheduled() wasScheduled} flag. + * + * @param cancelToken the cancellation token which can signal cancellation for the + * dependency error handler. This argument cannot be {@code null}. + * @param error the error to forward to complete this node with. This argument cannot be + * {@code null}. This argument cannot be {@code null}. + */ + @SuppressWarnings("ThrowableResultIgnored") + public void propagateDependencyFailure(CancellationToken cancelToken, Throwable error) { + Objects.requireNonNull(cancelToken, "cancelToken"); + Objects.requireNonNull(error, "error"); + + NodeTaskRef nodeTaskRef = nodeTaskRefRef.getAndSet(null); + if (nodeTaskRef == null) { + // The task was already scheduled so, we ignore dependency failure notification. + // Also, this should not happen when used reasonably. + return; + } + + DependencyErrorHandler errorHandler = nodeTaskRef.getProperties().tryGetDependencyErrorHandler(); + if (errorHandler == null) { + propagateFailure(error); + return; + } + + try { + if (cancelToken.isCanceled()) { + cancel(); + return; + } + + TaskExecutor executor = nodeTaskRef.getProperties().getExecutor(); + executor.execute(cancelToken, taskCancelToken -> { + errorHandler.handleDependencyError(taskCancelToken, key, error); + }).whenComplete((result, taskError) -> { + propagateSuppressed(error, taskError); + }); + } catch (Throwable ex) { + propagateSuppressed(error, ex); + throw ex; + } + } + + private void propagateSuppressed(Throwable error, Throwable suppressed) { + try { + if (suppressed != null && suppressed != error) { + error.addSuppressed(suppressed); + } + } finally { + propagateFailure(error); + } + } + /** * Completes this task node exceptionally with the given error if it was not * completed yet. If this task node was already completed, this method does nothing. diff --git a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/AbstractGraphExecutorTest.java b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/AbstractGraphExecutorTest.java index 752dd0ae..a2b79642 100644 --- a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/AbstractGraphExecutorTest.java +++ b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/AbstractGraphExecutorTest.java @@ -6,16 +6,18 @@ import java.util.Objects; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Supplier; import org.jtrim2.cancel.Cancellation; +import org.jtrim2.cancel.CancellationToken; import org.jtrim2.cancel.OperationCanceledException; import org.jtrim2.concurrent.AsyncTasks; import org.jtrim2.testutils.TestObj; import org.jtrim2.testutils.TestUtils; +import org.jtrim2.testutils.UnsafeConsumer; import org.junit.Test; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public abstract class AbstractGraphExecutorTest { private final Supplier graphConfigurerFactory; @@ -24,7 +26,7 @@ public AbstractGraphExecutorTest(Supplier graphConfigure this.graphConfigurerFactory = Objects.requireNonNull(graphConfigurerFactory, "graphConfigurerFactory"); } - private void test(Consumer graphConfigurerAction) { + private void test(UnsafeConsumer graphConfigurerAction) throws Exception { graphConfigurerAction.accept(graphConfigurerFactory.get()); } @@ -34,9 +36,11 @@ private static TaskNodeKey nodeKey(Class outputType, Class ar /** * This tests verifies that no deeply nested calls occur when the whole execution gets canceled. + * + * @throws Exception test failure */ @Test(timeout = 60000) - public void testFailureWithLongChainCancel() { + public void testFailureWithLongChainCancel() throws Exception { int rootCount = 10000; test((configurer) -> { @@ -106,7 +110,7 @@ public void testFailureWithLongChainCancel() { } @Test - public void testSingleNodeFails() { + public void testSingleNodeFails() throws Exception { test((configurer) -> { TaskFactoryDefiner factoryGroup1 = configurer.factoryGroupDefiner((properties) -> { }); @@ -159,7 +163,77 @@ public void testSingleNodeFails() { } @Test - public void testDoubleSplitGraph() { + public void testDependencyErrorHandler() throws Exception { + test((configurer) -> { + TaskFactoryDefiner factoryGroup1 = configurer.factoryGroupDefiner((properties) -> { + }); + + TaskFactoryKey leafFactoryKey + = new TaskFactoryKey<>(TestObj.class, String.class, "leaf-node"); + factoryGroup1.defineSimpleFactory(leafFactoryKey, (cancelToken, nodeDef) -> { + String factoryArg = nodeDef.factoryArg(); + return taskCancelToken -> { + throw new TestException(factoryArg); + }; + }); + + DependencyErrorHandler errorHandler = mock(DependencyErrorHandler.class); + + TaskFactoryKey rootFactoryKey + = new TaskFactoryKey<>(TestObj.class, String.class, "root-node"); + factoryGroup1.defineSimpleFactory(rootFactoryKey, (cancelToken, nodeDef) -> { + nodeDef.properties().setDependencyErrorHandler(errorHandler); + + TaskInputRef inputRef = nodeDef.inputs().bindInput(leafFactoryKey, "test-arg"); + return taskCancelToken -> inputRef.consumeInput(); + }); + + + TaskGraphBuilder builder = configurer.build(); + + TaskNodeKey requestedNodeKey = new TaskNodeKey<>(rootFactoryKey, "R"); + builder.addNode(requestedNodeKey); + + CompletionStage buildFuture = builder.buildGraph(Cancellation.UNCANCELABLE_TOKEN); + AtomicReference resultRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + buildFuture + .thenCompose((executor) -> { + executor.properties().setComputeErrorHandler((nodeKey, error) -> { + // Redefine to prevent logs + }); + executor.properties().setStopOnFailure(false); + executor.properties().setDeliverResultOnFailure(true); + + executor.properties().addResultNodeKey(requestedNodeKey); + + return executor.execute(Cancellation.UNCANCELABLE_TOKEN); + }) + .whenComplete((result, error) -> { + resultRef.set(result); + errorRef.set(error); + }); + + verify(errorHandler).handleDependencyError( + any(CancellationToken.class), + eq(requestedNodeKey), + isA(TestException.class)); + + TaskGraphExecutionResult result = resultRef.get(); + Throwable error = errorRef.get(); + + assertNotNull("result", result); + assertNull("error", error); + + assertEquals("resultType", ExecutionResultType.ERRORED, result.getResultType()); + + TestUtils.expectUnwrappedError(TestException.class, () -> result.getResult(requestedNodeKey)); + }); + } + + @Test + public void testDoubleSplitGraph() throws Exception { test((configurer) -> { TaskFactoryDefiner factoryGroup1 = configurer.factoryGroupDefiner((properties) -> { }); diff --git a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/TaskNodeTest.java b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/TaskNodeTest.java index 191adb64..80ea4168 100644 --- a/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/TaskNodeTest.java +++ b/subprojects/jtrim-task-graph/src/test/java/org/jtrim2/taskgraph/basic/TaskNodeTest.java @@ -12,9 +12,11 @@ import org.jtrim2.executor.ManualTaskExecutor; import org.jtrim2.executor.SyncTaskExecutor; import org.jtrim2.executor.TaskExecutor; +import org.jtrim2.taskgraph.DependencyErrorHandler; import org.jtrim2.taskgraph.TaskErrorHandler; import org.jtrim2.taskgraph.TaskNodeKey; import org.jtrim2.taskgraph.TaskNodeProperties; +import org.jtrim2.testutils.TestUtils; import org.junit.Test; import static org.jtrim2.taskgraph.basic.TestNodes.*; @@ -94,6 +96,63 @@ public void testComputeAsyncMultipleTimes2() { testNode.verifyRunWithResult(); } + private void testPropagateDependencyError(int propagateCount) throws Exception { + DependencyErrorHandler errorHandler = mock(DependencyErrorHandler.class); + TestTaskNode testNode = new TestTaskNode("node1", "TEST-RESULT", errorHandler); + + TestException error = new TestException("Test-Error"); + for (int i = 0; i < propagateCount; i++) { + testNode.node.propagateDependencyFailure(Cancellation.UNCANCELABLE_TOKEN, error); + } + + verify(errorHandler).handleDependencyError( + notNull(CancellationToken.class), + same(testNode.node.getKey()), + same(error)); + } + + @Test + public void testPropagateDependencyError() throws Exception { + testPropagateDependencyError(1); + } + + @Test + public void testPropagateDependencyErrorMultipleTimes() throws Exception { + testPropagateDependencyError(2); + } + + @Test + public void testPropagateDependencyErrorAfterSchedule() { + DependencyErrorHandler errorHandler = mock(DependencyErrorHandler.class); + TestTaskNode testNode = new TestTaskNode("node1", "TEST-RESULT", errorHandler); + + + testNode.ensureScheduled(); + testNode.verifyRunWithResult(); + + TestException error = new TestException("Test-Error"); + testNode.node.propagateDependencyFailure(Cancellation.UNCANCELABLE_TOKEN, error); + + verifyZeroInteractions(errorHandler); + } + + @Test + public void testScheduleAfterPropagateDependencyError() throws Exception { + DependencyErrorHandler errorHandler = mock(DependencyErrorHandler.class); + TestTaskNode testNode = new TestTaskNode("node1", "TEST-RESULT", errorHandler); + + TestException error = new TestException("Test-Error"); + testNode.node.propagateDependencyFailure(Cancellation.UNCANCELABLE_TOKEN, error); + + testNode.ensureScheduled(); + testNode.verifyNotRun(); + + verify(errorHandler).handleDependencyError( + notNull(CancellationToken.class), + same(testNode.node.getKey()), + same(error)); + } + @Test public void testExternalCancel() { TestTaskNode testNode = new TestTaskNode("node1", "TEST-RESULT"); @@ -217,16 +276,25 @@ public TestTaskNode(Object key, Object result) { } public TestTaskNode(Object key, Object result, TaskExecutor executor) { - TaskNodeProperties.Builder properties = new TaskNodeProperties.Builder(); - properties.setExecutor(executor); + this(key, result, TestUtils.build(new TaskNodeProperties.Builder(), properties -> { + properties.setExecutor(executor); + }).build()); + } + + public TestTaskNode(Object key, Object result, DependencyErrorHandler errorHandler) { + this(key, result, TestUtils.build(new TaskNodeProperties.Builder(), properties -> { + properties.setDependencyErrorHandler(errorHandler); + }).build()); + } + public TestTaskNode(Object key, Object result, TaskNodeProperties properties) { this.result = result; this.errorHandler = mock(TaskErrorHandler.class); this.function = new TestCancelableFunction<>(key, result); this.node = new TaskNode<>( node(key), - new NodeTaskRef<>(properties.build(), function)); + new NodeTaskRef<>(properties, function)); } public void ensureScheduled() { @@ -339,4 +407,12 @@ public CompletionStage executeFunction( throw expectedError; } } + + private static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public TestException(String message) { + super(message); + } + } }