Skip to content

Commit

Permalink
read timeout from workflow-steps
Browse files Browse the repository at this point in the history
Signed-off-by: Jackie Han <[email protected]>
  • Loading branch information
jackiehanyang committed Dec 23, 2023
1 parent 2b10621 commit 6353cf2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.flowframework;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.WorkflowValidator;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeleteWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
Expand All @@ -52,6 +55,7 @@
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand All @@ -66,6 +70,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,25 @@ public class WorkflowStepValidator {
private static final String OUTPUTS_FIELD = "outputs";
/** Required Plugins field name */
private static final String REQUIRED_PLUGINS = "required_plugins";
/** Timeout field name */
private static final String TIMEOUT = "timeout";

private List<String> inputs;
private List<String> outputs;
private List<String> requiredPlugins;
private String timeout;

/**
* Intantiate the object representing a Workflow Step validator
* Instantiate the object representing a Workflow Step validator
* @param inputs the workflow step inputs
* @param outputs the workflow step outputs
* @param requiredPlugins the required plugins for this workflow step
*/
public WorkflowStepValidator(List<String> inputs, List<String> outputs, List<String> requiredPlugins) {
public WorkflowStepValidator(List<String> inputs, List<String> outputs, List<String> requiredPlugins, String timeout) {
this.inputs = inputs;
this.outputs = outputs;
this.requiredPlugins = requiredPlugins;
this.timeout = timeout;
}

/**
Expand All @@ -54,6 +58,7 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept
List<String> parsedInputs = new ArrayList<>();
List<String> parsedOutputs = new ArrayList<>();
List<String> requiredPlugins = new ArrayList<>();
String timeout = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -78,11 +83,14 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept
requiredPlugins.add(parser.text());
}
break;
case TIMEOUT:
timeout = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object.");
}
}
return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins);
return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins, timeout);
}

/**
Expand All @@ -103,9 +111,17 @@ public List<String> getOutputs() {

/**
* Get the required plugins
* @return the outputs
* @return the required plugins
*/
public List<String> getRequiredPlugins() {
return List.copyOf(requiredPlugins);
}

/**
* Get the timeout
* @return the timeout
*/
public String getTimeout() {
return timeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.plugins.PluginInfo;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -35,6 +36,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -141,7 +143,7 @@ public List<ProcessNode> sortProcessNodes(Workflow workflow, String workflowId)
* @throws Exception if validation fails
*/
public void validate(List<ProcessNode> processNodes) throws Exception {
WorkflowValidator validator = WorkflowValidator.parse("mappings/workflow-steps.json");
WorkflowValidator validator = readWorkflowValidator();
validatePluginsInstalled(processNodes, validator);
validateGraph(processNodes, validator);
}
Expand Down Expand Up @@ -244,11 +246,17 @@ public void validateGraph(List<ProcessNode> processNodes, WorkflowValidator vali
);
}
}
}

private WorkflowValidator readWorkflowValidator() throws IOException {
return WorkflowValidator.parse("mappings/workflow-steps.json");
}

private TimeValue parseTimeout(WorkflowNode node) {
String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, NODE_TIMEOUT_DEFAULT_VALUE);
private TimeValue parseTimeout(WorkflowNode node) throws IOException {
WorkflowValidator validator = readWorkflowValidator();
String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout())
.orElse(NODE_TIMEOUT_DEFAULT_VALUE);
String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue);
String fieldName = String.join(".", node.id(), USER_INPUTS_FIELD, NODE_TIMEOUT_FIELD);
TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName);
if (timeValue.millis() < 0) {
Expand Down
39 changes: 26 additions & 13 deletions src/main/resources/mappings/workflow-steps.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"outputs":[
"index_name"
],
"required_plugins":[]
"required_plugins":[],
"timeout": "10s"
},
"create_ingest_pipeline": {
"inputs":[
Expand All @@ -26,7 +27,8 @@
"outputs":[
"pipeline_id"
],
"required_plugins":[]
"required_plugins":[],
"timeout": "10s"
},
"create_connector": {
"inputs":[
Expand All @@ -43,7 +45,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"delete_connector": {
"inputs": [
Expand All @@ -54,7 +57,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_local_model": {
"inputs":[
Expand All @@ -73,7 +77,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_remote_model": {
"inputs": [
Expand All @@ -87,7 +92,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"delete_model": {
"inputs": [
Expand All @@ -98,7 +104,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"deploy_model": {
"inputs":[
Expand All @@ -109,7 +116,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "15s"
},
"undeploy_model": {
"inputs":[
Expand All @@ -120,7 +128,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_model_group": {
"inputs":[
Expand All @@ -132,7 +141,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"register_agent": {
"inputs":[
Expand All @@ -150,7 +160,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"delete_agent": {
"inputs": [
Expand All @@ -161,7 +172,8 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
},
"create_tool": {
"inputs": [
Expand All @@ -172,6 +184,7 @@
],
"required_plugins":[
"opensearch-ml"
]
],
"timeout": "10s"
}
}

0 comments on commit 6353cf2

Please sign in to comment.