From 3d31567f47b306d52007f6bc5921c600433ed808 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Mon, 6 Nov 2023 21:04:04 -0800 Subject: [PATCH] Implemented throttling for max workflows to be created Signed-off-by: Owais Kazi --- .../flowframework/FlowFrameworkPlugin.java | 9 +++- .../FlowFrameworkFeatureEnabledSetting.java | 11 +--- .../common/FlowFrameworkSettings.java | 47 ++++++++++++++++ .../rest/AbstractWorkflowAction.java | 40 ++++++++++++++ .../rest/RestCreateWorkflowAction.java | 18 +++++-- .../rest/RestProvisionWorkflowAction.java | 4 +- .../CreateWorkflowTransportAction.java | 43 +++++++++++++++ .../transport/WorkflowRequest.java | 54 +++++++++++++++++-- .../FlowFrameworkPluginTests.java | 10 ++-- .../opensearch/flowframework/TestHelpers.java | 17 ++++++ ...owFrameworkFeatureEnabledSettingTests.java | 2 +- .../rest/RestCreateWorkflowActionTests.java | 20 ++++++- .../CreateWorkflowTransportActionTests.java | 13 +++-- ...ProvisionWorkflowTransportActionTests.java | 4 +- .../WorkflowRequestResponseTests.java | 6 +-- 15 files changed, 263 insertions(+), 35 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java create mode 100644 src/main/java/org/opensearch/flowframework/rest/AbstractWorkflowAction.java diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index c52bd2fef..f44b72495 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -56,6 +56,9 @@ import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; /** * An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch. @@ -63,6 +66,7 @@ public class FlowFrameworkPlugin extends Plugin implements ActionPlugin { private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private ClusterService clusterService; /** * Instantiate this plugin. @@ -84,6 +88,7 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { Settings settings = environment.settings(); + this.clusterService = clusterService; flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings); MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client); @@ -106,7 +111,7 @@ public List getRestHandlers( Supplier nodesInCluster ) { return ImmutableList.of( - new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting), + new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService), new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting) ); @@ -123,7 +128,7 @@ public List getRestHandlers( @Override public List> getSettings() { - List> settings = ImmutableList.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED); + List> settings = ImmutableList.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT); return settings; } diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java index f10068f5b..87f5412a8 100644 --- a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java +++ b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSetting.java @@ -9,22 +9,15 @@ package org.opensearch.flowframework.common; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; + /** * Controls enabling or disabling features of this plugin */ public class FlowFrameworkFeatureEnabledSetting { - /** This setting enables/disables the Flow Framework REST API */ - public static final Setting FLOW_FRAMEWORK_ENABLED = Setting.boolSetting( - "plugins.flow_framework.enabled", - false, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - private volatile Boolean isFlowFrameworkEnabled; /** diff --git a/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java new file mode 100644 index 000000000..5ef506d51 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/common/FlowFrameworkSettings.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.common; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; + +/** The common settings of flow framework */ +public class FlowFrameworkSettings { + + private FlowFrameworkSettings() {} + + /** The upper limit of max workflows that can be created */ + public static final int MAX_WORKFLOWS_LIMIT = 34; + + /** This setting sets max workflows that can be created */ + public static final Setting MAX_WORKFLOWS = Setting.intSetting( + "plugins.flow_framework.max_workflows", + 0, + 0, + MAX_WORKFLOWS_LIMIT, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** This setting sets the timeout for the request */ + public static final Setting WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting( + "plugins.flow_framework.request_timeout", + TimeValue.timeValueSeconds(10), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** This setting enables/disables the Flow Framework REST API */ + public static final Setting FLOW_FRAMEWORK_ENABLED = Setting.boolSetting( + "plugins.flow_framework.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); +} diff --git a/src/main/java/org/opensearch/flowframework/rest/AbstractWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/AbstractWorkflowAction.java new file mode 100644 index 000000000..55e35da96 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/rest/AbstractWorkflowAction.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.rest.BaseRestHandler; + +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; + +/** + * Abstract action for the rest actions + */ +public abstract class AbstractWorkflowAction extends BaseRestHandler { + protected volatile TimeValue requestTimeout; + protected volatile Integer maxWorkflows; + + /** + * Instantiates a new AbstractWorkflowAction + * + * @param settings Environment settings + * @param clusterService clusterService + */ + public AbstractWorkflowAction(Settings settings, ClusterService clusterService) { + this.requestTimeout = WORKFLOW_REQUEST_TIMEOUT.get(settings); + this.maxWorkflows = MAX_WORKFLOWS.get(settings); + + clusterService.getClusterSettings().addSettingsUpdateConsumer(WORKFLOW_REQUEST_TIMEOUT, it -> requestTimeout = it); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_WORKFLOWS, it -> maxWorkflows = it); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index bd872b4ff..7bd5fbfbe 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -12,6 +12,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; @@ -21,7 +23,6 @@ import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.WorkflowRequest; -import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; @@ -32,12 +33,12 @@ import static org.opensearch.flowframework.common.CommonValue.DRY_RUN; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; -import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; /** * Rest Action to facilitate requests to create and update a use case template */ -public class RestCreateWorkflowAction extends BaseRestHandler { +public class RestCreateWorkflowAction extends AbstractWorkflowAction { private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class); private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action"; @@ -48,8 +49,15 @@ public class RestCreateWorkflowAction extends BaseRestHandler { * Instantiates a new RestCreateWorkflowAction * * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled + * @param settings Environment settings + * @param clusterService clusterService */ - public RestCreateWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + public RestCreateWorkflowAction( + FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting, + Settings settings, + ClusterService clusterService + ) { + super(settings, clusterService); this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; } @@ -85,7 +93,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Template template = Template.parse(request.content().utf8ToString()); boolean dryRun = request.paramAsBoolean(DRY_RUN, false); - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun, requestTimeout, maxWorkflows); return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); diff --git a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java index 81e4fb606..69d853423 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java @@ -30,7 +30,7 @@ import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; -import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; /** * Rest action to facilitate requests to provision a workflow from an inline defined or stored use case template @@ -84,7 +84,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); } // Create request and provision - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, null, null); return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index a6b809fc8..1f80b0e32 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -12,13 +12,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.common.CommonValue; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.model.ProvisioningProgress; @@ -27,6 +32,9 @@ import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.workflow.ProcessNode; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; @@ -47,6 +55,7 @@ public class CreateWorkflowTransportAction extends HandledTransportAction onSearchGlobalContext(response, listener, request.getMaxWorkflows()), + exception -> listener.onFailure(exception) + ) + ); + // Create new global context and state index entries flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> { flowFrameworkIndicesHandler.putInitialStateToWorkflowState( @@ -160,6 +188,21 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener= maxWorkflow) { + String errorMessage = "Maximum workflows limit reached" + maxWorkflow; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + } + } + private void validateWorkflows(Template template) throws Exception { for (Workflow workflow : template.workflows().values()) { List sortedNodes = workflowProcessSorter.sortProcessNodes(workflow); diff --git a/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java b/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java index 2d2046329..92e83282c 100644 --- a/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java +++ b/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java @@ -11,6 +11,7 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.Nullable; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.flowframework.model.Template; @@ -37,13 +38,30 @@ public class WorkflowRequest extends ActionRequest { */ private boolean dryRun; + /** + * Timeout for request + */ + private TimeValue requestTimeout; + + /** + * Max workflows + */ + private Integer maxWorkflows; + /** * Instantiates a new WorkflowRequest and defaults dry run to false * @param workflowId the documentId of the workflow * @param template the use case template which describes the workflow + * @param requestTimeout timeout of the request + * @param maxWorkflows max number of workflows */ - public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) { - this(workflowId, template, false); + public WorkflowRequest( + @Nullable String workflowId, + @Nullable Template template, + @Nullable TimeValue requestTimeout, + @Nullable Integer maxWorkflows + ) { + this(workflowId, template, false, requestTimeout, maxWorkflows); } /** @@ -51,11 +69,21 @@ public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) * @param workflowId the documentId of the workflow * @param template the use case template which describes the workflow * @param dryRun flag to indicate if validation is necessary + * @param requestTimeout timeout of the request + * @param maxWorkflows max number of workflows */ - public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun) { + public WorkflowRequest( + @Nullable String workflowId, + @Nullable Template template, + boolean dryRun, + TimeValue requestTimeout, + Integer maxWorkflows + ) { this.workflowId = workflowId; this.template = template; this.dryRun = dryRun; + this.requestTimeout = requestTimeout; + this.maxWorkflows = maxWorkflows; } /** @@ -69,6 +97,8 @@ public WorkflowRequest(StreamInput in) throws IOException { String templateJson = in.readOptionalString(); this.template = templateJson == null ? null : Template.parse(templateJson); this.dryRun = in.readBoolean(); + this.requestTimeout = in.readOptionalTimeValue(); + this.maxWorkflows = in.readOptionalInt(); } /** @@ -97,12 +127,30 @@ public boolean isDryRun() { return this.dryRun; } + /** + * Gets the timeout of the request + * @return the requestTimeout + */ + public TimeValue getRequestTimeout() { + return requestTimeout; + } + + /** + * Gets the max workflows + * @return the maxWorkflows + */ + public Integer getMaxWorkflows() { + return maxWorkflows; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalString(workflowId); out.writeOptionalString(template == null ? null : template.toJson()); out.writeBoolean(dryRun); + out.writeOptionalTimeValue(requestTimeout); + out.writeOptionalInt(maxWorkflows); } @Override diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index 53ebc3451..d0b35f9d6 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -16,7 +16,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.env.Environment; -import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -27,6 +26,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -59,7 +61,7 @@ public void setUp() throws Exception { final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED) + Stream.of(FLOW_FRAMEWORK_ENABLED, MAX_WORKFLOWS, WORKFLOW_REQUEST_TIMEOUT) ).collect(Collectors.toSet()); clusterSettings = new ClusterSettings(settings, settingsSet); clusterService = mock(ClusterService.class); @@ -78,10 +80,10 @@ public void testPlugin() throws IOException { 3, ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size() ); - assertEquals(3, ffp.getRestHandlers(null, null, null, null, null, null, null).size()); + assertEquals(3, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); assertEquals(3, ffp.getActions().size()); assertEquals(1, ffp.getExecutorBuilders(settings).size()); - assertEquals(1, ffp.getSettings().size()); + assertEquals(3, ffp.getSettings().size()); } } } diff --git a/src/test/java/org/opensearch/flowframework/TestHelpers.java b/src/test/java/org/opensearch/flowframework/TestHelpers.java index 002b59458..9c3f8a07e 100644 --- a/src/test/java/org/opensearch/flowframework/TestHelpers.java +++ b/src/test/java/org/opensearch/flowframework/TestHelpers.java @@ -9,8 +9,16 @@ package org.opensearch.flowframework; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.commons.authuser.User; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; public class TestHelpers { @@ -23,4 +31,13 @@ public static User randomUser() { ImmutableList.of("attribute=test") ); } + + public static ClusterSettings clusterSetting(Settings settings, Setting... setting) { + final Set> settingsSet = Stream.concat( + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), + Sets.newHashSet(setting).stream() + ).collect(Collectors.toSet()); + ClusterSettings clusterSettings = new ClusterSettings(settings, settingsSet); + return clusterSettings; + } } diff --git a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java index 9ac16c6f3..232dd71f2 100644 --- a/src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java +++ b/src/test/java/org/opensearch/flowframework/common/FlowFrameworkFeatureEnabledSettingTests.java @@ -37,7 +37,7 @@ public void setUp() throws Exception { settings = Settings.builder().build(); final Set> settingsSet = Stream.concat( ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(), - Stream.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED) + Stream.of(FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED) ).collect(Collectors.toSet()); clusterSettings = new ClusterSettings(settings, settingsSet); clusterService = mock(ClusterService.class); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index 8a3564abe..c81498cea 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -10,6 +10,10 @@ import org.opensearch.Version; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -30,7 +34,10 @@ import java.util.Map; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class RestCreateWorkflowActionTests extends OpenSearchTestCase { @@ -41,11 +48,21 @@ public class RestCreateWorkflowActionTests extends OpenSearchTestCase { private String updateWorkflowPath; private NodeClient nodeClient; private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private Settings settings; + private ClusterService clusterService; @Override public void setUp() throws Exception { super.setUp(); flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + settings = Settings.builder() + .put(WORKFLOW_REQUEST_TIMEOUT.getKey(), TimeValue.timeValueMillis(10)) + .put(MAX_WORKFLOWS.getKey(), 2) + .build(); + + ClusterSettings clusterSettings = TestHelpers.clusterSetting(settings, WORKFLOW_REQUEST_TIMEOUT, MAX_WORKFLOWS); + clusterService = spy(new ClusterService(settings, clusterSettings, null)); + when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); Version templateVersion = Version.fromString("1.0.0"); @@ -69,7 +86,8 @@ public void setUp() throws Exception { // Invalid template configuration, wrong field name this.invalidTemplate = template.toJson().replace("use_case", "invalid"); - this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService); this.createWorkflowPath = String.format(Locale.ROOT, "%s", WORKFLOW_URI); this.updateWorkflowPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id"); this.nodeClient = mock(NodeClient.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index fbec8a034..f2034dd05 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -13,6 +13,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.flowframework.TestHelpers; @@ -51,11 +52,16 @@ public class CreateWorkflowTransportActionTests extends OpenSearchTestCase { private ThreadPool threadPool; private ParseUtils parseUtils; private ThreadContext threadContext; + private Settings settings; @Override public void setUp() throws Exception { super.setUp(); threadPool = mock(ThreadPool.class); + settings = Settings.builder() + .put("plugins.flow_framework.max_workflows.", 2) + .put("plugins.anomaly_detection.backoff_initial_delay", TimeValue.timeValueSeconds(10)) + .build(); this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); this.workflowProcessSorter = new WorkflowProcessSorter(mock(WorkflowStepFactory.class), threadPool); this.createWorkflowTransportAction = new CreateWorkflowTransportAction( @@ -63,6 +69,7 @@ public void setUp() throws Exception { mock(ActionFilters.class), workflowProcessSorter, flowFrameworkIndicesHandler, + settings, client ); // client = mock(Client.class); @@ -146,7 +153,7 @@ public void testFailedDryRunValidation() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowRequest createNewWorkflow = new WorkflowRequest(null, cyclicalTemplate, true); + WorkflowRequest createNewWorkflow = new WorkflowRequest(null, cyclicalTemplate, true, null, null); createWorkflowTransportAction.doExecute(mock(Task.class), createNewWorkflow, listener); ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); @@ -157,7 +164,7 @@ public void testFailedDryRunValidation() { public void testFailedToCreateNewWorkflow() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowRequest createNewWorkflow = new WorkflowRequest(null, template); + WorkflowRequest createNewWorkflow = new WorkflowRequest(null, template, null, null); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); @@ -174,7 +181,7 @@ public void testFailedToCreateNewWorkflow() { public void testFailedToUpdateWorkflow() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowRequest updateWorkflow = new WorkflowRequest("1", template); + WorkflowRequest updateWorkflow = new WorkflowRequest("1", template, null, null); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index d3f6fb6fd..9e22bff01 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -102,7 +102,7 @@ public void testProvisionWorkflow() { String workflowId = "1"; @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, null, null); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); @@ -124,7 +124,7 @@ public void testProvisionWorkflow() { public void testFailedToRetrieveTemplateFromGlobalContext() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); - WorkflowRequest request = new WorkflowRequest("1", null); + WorkflowRequest request = new WorkflowRequest("1", null, null, null); doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(1); responseListener.onFailure(new Exception("Failed to retrieve template from global context.")); diff --git a/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java b/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java index d64249e27..5de21d5ab 100644 --- a/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java @@ -55,7 +55,7 @@ public void setUp() throws Exception { } public void testNullIdWorkflowRequest() throws IOException { - WorkflowRequest nullIdRequest = new WorkflowRequest(null, template); + WorkflowRequest nullIdRequest = new WorkflowRequest(null, template, null, null); assertNull(nullIdRequest.getWorkflowId()); assertEquals(template, nullIdRequest.getTemplate()); assertNull(nullIdRequest.validate()); @@ -71,7 +71,7 @@ public void testNullIdWorkflowRequest() throws IOException { } public void testNullTemplateWorkflowRequest() throws IOException { - WorkflowRequest nullTemplateRequest = new WorkflowRequest("123", null); + WorkflowRequest nullTemplateRequest = new WorkflowRequest("123", null, null, null); assertNotNull(nullTemplateRequest.getWorkflowId()); assertNull(nullTemplateRequest.getTemplate()); assertNull(nullTemplateRequest.validate()); @@ -87,7 +87,7 @@ public void testNullTemplateWorkflowRequest() throws IOException { } public void testWorkflowRequest() throws IOException { - WorkflowRequest workflowRequest = new WorkflowRequest("123", template); + WorkflowRequest workflowRequest = new WorkflowRequest("123", template, null, null); assertNotNull(workflowRequest.getWorkflowId()); assertEquals(template, workflowRequest.getTemplate()); assertNull(workflowRequest.validate());