Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add timeout for node execution #69

Merged
merged 1 commit into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
codecov:
require_ci_to_pass: yes
require_ci_to_pass: true

# ignore files in demo package
ignore:
Expand All @@ -12,5 +12,8 @@ coverage:
status:
project:
default:
target: 70% # the required coverage value
threshold: 1% # the leniency in hitting the target
target: auto
threshold: 2% # project coverage can drop
patch:
default:
target: 70% # required diff coverage value
18 changes: 11 additions & 7 deletions src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -26,8 +28,7 @@
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
Expand All @@ -37,6 +38,8 @@ public class Demo {

private static final Logger logger = LogManager.getLogger(Demo.class);

private Demo() {}

/**
* Demonstrate parsing a JSON graph.
*
Expand All @@ -54,13 +57,14 @@ public static void main(String[] args) throws IOException {
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
ExecutorService executor = Executors.newFixedThreadPool(10);
WorkflowProcessSorter.create(factory, executor);
WorkflowStepFactory factory = new WorkflowStepFactory(client);

ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);

logger.info("Parsing graph to sequence...");
Template t = Template.parse(json);
List<ProcessNode> processSequence = WorkflowProcessSorter.get().sortProcessNodes(t.workflows().get("demo"));
List<ProcessNode> processSequence = workflowProcessSorter.sortProcessNodes(t.workflows().get("demo"));
List<CompletableFuture<?>> futureList = new ArrayList<>();

for (ProcessNode n : processSequence) {
Expand All @@ -80,6 +84,6 @@ public static void main(String[] args) throws IOException {
}
futureList.forEach(CompletableFuture::join);
logger.info("All done!");
executor.shutdown();
ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS);
}
}
14 changes: 10 additions & 4 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Demo class exercising {@link WorkflowProcessSorter}. This will be moved to a unit test.
Expand All @@ -32,6 +34,8 @@ public class TemplateParseDemo {

private static final Logger logger = LogManager.getLogger(TemplateParseDemo.class);

private TemplateParseDemo() {}

/**
* Demonstrate parsing a JSON graph.
*
Expand All @@ -49,8 +53,9 @@ public static void main(String[] args) throws IOException {
return;
}
Client client = new NodeClient(null, null);
WorkflowStepFactory factory = WorkflowStepFactory.create(client);
WorkflowProcessSorter.create(factory, Executors.newFixedThreadPool(10));
WorkflowStepFactory factory = new WorkflowStepFactory(client);
ThreadPool threadPool = new ThreadPool(Settings.EMPTY);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(factory, threadPool);

Template t = Template.parse(json);

Expand All @@ -59,7 +64,8 @@ public static void main(String[] args) throws IOException {

for (Entry<String, Workflow> e : t.workflows().entrySet()) {
logger.info("Parsing {} workflow.", e.getKey());
WorkflowProcessSorter.get().sortProcessNodes(e.getValue());
workflowProcessSorter.sortProcessNodes(e.getValue());
}
ThreadPool.terminate(threadPool, 500, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
*/
public class FlowFrameworkPlugin extends Plugin {

/**
* Instantiate this plugin.
*/
public FlowFrameworkPlugin() {}

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -46,8 +51,8 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
WorkflowStepFactory workflowStepFactory = WorkflowStepFactory.create(client);
WorkflowProcessSorter workflowProcessSorter = WorkflowProcessSorter.create(workflowStepFactory, threadPool.generic());
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(client);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public class WorkflowNode implements ToXContentObject {
public static final String INPUTS_FIELD = "inputs";
/** The field defining processors in the inputs for search and ingest pipelines */
public static final String PROCESSORS_FIELD = "processors";
/** The field defining the timeout value for this node */
public static final String NODE_TIMEOUT_FIELD = "node_timeout";
/** The default timeout value if the template doesn't override it */
public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s";

private final String id; // unique id
private final String type; // maps to a WorkflowStep
Expand Down
115 changes: 68 additions & 47 deletions src/main/java/org/opensearch/flowframework/workflow/ProcessNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler.ScheduledCancellable;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* Representation of a process node in a workflow graph. Tracks predecessor nodes which must be completed before it can start execution.
* Representation of a process node in a workflow graph.
* Tracks predecessor nodes which must be completed before it can start execution.
*/
public class ProcessNode {

Expand All @@ -30,7 +32,8 @@ public class ProcessNode {
private final WorkflowStep workflowStep;
private final WorkflowData input;
private final List<ProcessNode> predecessors;
private Executor executor;
private final ThreadPool threadPool;
private final TimeValue nodeTimeout;

private final CompletableFuture<WorkflowData> future = new CompletableFuture<>();

Expand All @@ -41,14 +44,23 @@ public class ProcessNode {
* @param workflowStep A java class implementing {@link WorkflowStep} to be executed when it's this node's turn.
* @param input Input required by the node encoded in a {@link WorkflowData} instance.
* @param predecessors Nodes preceding this one in the workflow
* @param executor The OpenSearch thread pool
* @param threadPool The OpenSearch thread pool
* @param nodeTimeout The timeout value for executing on this node
*/
public ProcessNode(String id, WorkflowStep workflowStep, WorkflowData input, List<ProcessNode> predecessors, Executor executor) {
public ProcessNode(
String id,
WorkflowStep workflowStep,
WorkflowData input,
List<ProcessNode> predecessors,
ThreadPool threadPool,
TimeValue nodeTimeout
) {
this.id = id;
this.workflowStep = workflowStep;
this.input = input;
this.predecessors = predecessors;
this.executor = executor;
this.threadPool = threadPool;
this.nodeTimeout = nodeTimeout;
}

/**
Expand Down Expand Up @@ -90,64 +102,73 @@ public CompletableFuture<WorkflowData> future() {
* Returns the predecessors of this node in the workflow.
* The predecessor's {@link #future()} must complete before execution begins on this node.
*
* @return a set of predecessor nodes, if any. At least one node in the graph must have no predecessors and serve as a start node.
* @return a set of predecessor nodes, if any.
* At least one node in the graph must have no predecessors and serve as a start node.
*/
public List<ProcessNode> predecessors() {
return predecessors;
}

/**
* Execute this node in the sequence. Initializes the node's {@link CompletableFuture} and completes it when the process completes.
* Returns the timeout value of this node in the workflow. A value of {@link TimeValue#ZERO} means no timeout.
* @return The node's timeout value.
*/
public TimeValue nodeTimeout() {
return nodeTimeout;
}

/**
* Execute this node in the sequence.
* Initializes the node's {@link CompletableFuture} and completes it when the process completes.
*
* @return this node's future. This is returned immediately, while process execution continues asynchronously.
* @return this node's future.
* This is returned immediately, while process execution continues asynchronously.
*/
public CompletableFuture<WorkflowData> execute() {
// TODO this class will be instantiated with the OpenSearch thread pool (or one for tests!)
// the generic executor from that pool should be passed to this runAsync call
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
if (this.future.isDone()) {
throw new IllegalStateException("Process Node [" + this.id + "] already executed.");
}
CompletableFuture.runAsync(() -> {
List<CompletableFuture<WorkflowData>> predFutures = predecessors.stream().map(p -> p.future()).collect(Collectors.toList());
if (!predecessors.isEmpty()) {
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));
try {
// We need timeouts to be part of the user template or in settings
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/45
waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get();
} catch (InterruptedException | ExecutionException e) {
handleException(e);
return;
try {
if (!predecessors.isEmpty()) {
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));
waitForPredecessors.join();
}
}
logger.info(">>> Starting {}.", this.id);
// get the input data from predecessor(s)
List<WorkflowData> input = new ArrayList<WorkflowData>();
input.add(this.input);
for (CompletableFuture<WorkflowData> cf : predFutures) {
try {

logger.info("Starting {}.", this.id);
// get the input data from predecessor(s)
List<WorkflowData> input = new ArrayList<WorkflowData>();
input.add(this.input);
for (CompletableFuture<WorkflowData> cf : predFutures) {
input.add(cf.get());
} catch (InterruptedException | ExecutionException e) {
handleException(e);
return;
}
}
CompletableFuture<WorkflowData> stepFuture = this.workflowStep.execute(input);
try {
stepFuture.orTimeout(15, TimeUnit.SECONDS).join();
logger.info(">>> Finished {}.", this.id);

ScheduledCancellable delayExec = null;
if (this.nodeTimeout.compareTo(TimeValue.ZERO) > 0) {
delayExec = threadPool.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("Execute timed out for " + this.id));
}
}, this.nodeTimeout, ThreadPool.Names.SAME);
}
CompletableFuture<WorkflowData> stepFuture = this.workflowStep.execute(input);
// If completed exceptionally, this is a no-op
future.complete(stepFuture.get());
} catch (InterruptedException | ExecutionException e) {
handleException(e);
if (delayExec != null) {
delayExec.cancel();
}
logger.info("Finished {}.", this.id);
} catch (Throwable e) {
// TODO: better handling of getCause
this.future.completeExceptionally(e);
}
}, executor);
// TODO: improve use of thread pool beyond generic
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/61
}, threadPool.generic());
return this.future;
}

private void handleException(Exception e) {
// TODO: better handling of getCause
this.future.completeExceptionally(e);
logger.debug("<<< Completed Exceptionally {}", this.id, e.getCause());
}

@Override
public String toString() {
return this.id;
Expand Down
Loading
Loading