diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 4ffd69342..38220fffc 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -9,6 +9,8 @@ package org.opensearch.flowframework; import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionRequest; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -27,6 +29,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.model.WorkflowValidator; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; import org.opensearch.flowframework.rest.RestDeleteWorkflowAction; import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction; @@ -52,6 +55,7 @@ import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction; import org.opensearch.flowframework.transport.SearchWorkflowTransportAction; import org.opensearch.flowframework.util.EncryptorUtils; +import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -66,6 +70,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.function.Supplier; diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java index c9689b975..4a5d17417 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java @@ -27,21 +27,25 @@ public class WorkflowStepValidator { private static final String OUTPUTS_FIELD = "outputs"; /** Required Plugins field name */ private static final String REQUIRED_PLUGINS = "required_plugins"; + /** Timeout field name */ + private static final String TIMEOUT = "timeout"; private List inputs; private List outputs; private List requiredPlugins; + private String timeout; /** - * Intantiate the object representing a Workflow Step validator + * Instantiate the object representing a Workflow Step validator * @param inputs the workflow step inputs * @param outputs the workflow step outputs * @param requiredPlugins the required plugins for this workflow step */ - public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins) { + public WorkflowStepValidator(List inputs, List outputs, List requiredPlugins, String timeout) { this.inputs = inputs; this.outputs = outputs; this.requiredPlugins = requiredPlugins; + this.timeout = timeout; } /** @@ -54,6 +58,7 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept List parsedInputs = new ArrayList<>(); List parsedOutputs = new ArrayList<>(); List requiredPlugins = new ArrayList<>(); + String timeout = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -78,11 +83,14 @@ public static WorkflowStepValidator parse(XContentParser parser) throws IOExcept requiredPlugins.add(parser.text()); } break; + case TIMEOUT: + timeout = parser.text(); + break; default: throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object."); } } - return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins); + return new WorkflowStepValidator(parsedInputs, parsedOutputs, requiredPlugins, timeout); } /** @@ -103,9 +111,17 @@ public List getOutputs() { /** * Get the required plugins - * @return the outputs + * @return the required plugins */ public List getRequiredPlugins() { return List.copyOf(requiredPlugins); } + + /** + * Get the timeout + * @return the timeout + */ + public String getTimeout() { + return timeout; + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 04f6349a4..f091e3d1a 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -27,6 +27,7 @@ import org.opensearch.plugins.PluginInfo; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -35,6 +36,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -141,7 +143,7 @@ public List sortProcessNodes(Workflow workflow, String workflowId) * @throws Exception if validation fails */ public void validate(List processNodes) throws Exception { - WorkflowValidator validator = WorkflowValidator.parse("mappings/workflow-steps.json"); + WorkflowValidator validator = readWorkflowValidator(); validatePluginsInstalled(processNodes, validator); validateGraph(processNodes, validator); } @@ -244,11 +246,17 @@ public void validateGraph(List processNodes, WorkflowValidator vali ); } } + } + private WorkflowValidator readWorkflowValidator() throws IOException { + return WorkflowValidator.parse("mappings/workflow-steps.json"); } - private TimeValue parseTimeout(WorkflowNode node) { - String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, NODE_TIMEOUT_DEFAULT_VALUE); + private TimeValue parseTimeout(WorkflowNode node) throws IOException { + WorkflowValidator validator = readWorkflowValidator(); + String nodeTimeoutValue = Optional.ofNullable(validator.getWorkflowStepValidators().get(node.type()).getTimeout()) + .orElse(NODE_TIMEOUT_DEFAULT_VALUE); + String timeoutValue = (String) node.userInputs().getOrDefault(NODE_TIMEOUT_FIELD, nodeTimeoutValue); String fieldName = String.join(".", node.id(), USER_INPUTS_FIELD, NODE_TIMEOUT_FIELD); TimeValue timeValue = TimeValue.parseTimeValue(timeoutValue, fieldName); if (timeValue.millis() < 0) { diff --git a/src/main/resources/mappings/workflow-steps.json b/src/main/resources/mappings/workflow-steps.json index 989d3c749..0d2c5f260 100644 --- a/src/main/resources/mappings/workflow-steps.json +++ b/src/main/resources/mappings/workflow-steps.json @@ -12,7 +12,8 @@ "outputs":[ "index_name" ], - "required_plugins":[] + "required_plugins":[], + "timeout": "10s" }, "create_ingest_pipeline": { "inputs":[ @@ -26,7 +27,8 @@ "outputs":[ "pipeline_id" ], - "required_plugins":[] + "required_plugins":[], + "timeout": "10s" }, "create_connector": { "inputs":[ @@ -43,7 +45,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "delete_connector": { "inputs": [ @@ -54,7 +57,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_local_model": { "inputs":[ @@ -73,7 +77,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_remote_model": { "inputs": [ @@ -87,7 +92,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "delete_model": { "inputs": [ @@ -98,7 +104,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "deploy_model": { "inputs":[ @@ -109,7 +116,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "15s" }, "undeploy_model": { "inputs":[ @@ -120,7 +128,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_model_group": { "inputs":[ @@ -132,7 +141,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "register_agent": { "inputs":[ @@ -150,7 +160,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "delete_agent": { "inputs": [ @@ -161,7 +172,8 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" }, "create_tool": { "inputs": [ @@ -172,6 +184,7 @@ ], "required_plugins":[ "opensearch-ml" - ] + ], + "timeout": "10s" } }