Skip to content

Commit

Permalink
Put plugin API behind a feature flag (#142)
Browse files Browse the repository at this point in the history
* Put plugin API behind a feature flag

Signed-off-by: Daniel Widdis <[email protected]>

* Add test for feature flag disabled

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
(cherry picked from commit aad17d0)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Nov 3, 2023
1 parent 80efe24 commit 32f778f
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
Expand Down Expand Up @@ -57,6 +59,8 @@
*/
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiate this plugin.
*/
Expand All @@ -76,6 +80,9 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);
Expand All @@ -95,7 +102,10 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(new RestCreateWorkflowAction(), new RestProvisionWorkflowAction());
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}

@Override
Expand All @@ -106,6 +116,12 @@ public List<RestHandler> getRestHandlers(
);
}

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = ImmutableList.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED);
return settings;
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// TODO : Determine final size/queueSize values for the provision thread pool
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Controls enabling or disabling features of this plugin
*/
public class FlowFrameworkFeatureEnabledSetting {

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile Boolean isFlowFrameworkEnabled;

/**
* Instantiate this class.
*
* @param clusterService OpenSearch cluster service
* @param settings OpenSearch settings
*/
public FlowFrameworkFeatureEnabledSetting(ClusterService clusterService, Settings settings) {
// Currently this is just an on/off switch for the entire plugin's API.
// If desired more fine-tuned feature settings can be added below.
isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
}

/**
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble.
* @return whether Flow Framework is enabled.
*/
public boolean isFlowFrameworkEnabled() {
return isFlowFrameworkEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
Expand All @@ -31,6 +32,7 @@
import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to create and update a use case template
Expand All @@ -40,10 +42,16 @@ public class RestCreateWorkflowAction extends BaseRestHandler {
private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Intantiates a new RestCreateWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestCreateWorkflowAction() {}
public RestCreateWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
Expand All @@ -62,6 +70,15 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {

String workflowId = request.param(WORKFLOW_ID);
Expand Down Expand Up @@ -89,5 +106,4 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
Expand All @@ -29,6 +30,7 @@

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;

/**
* Rest action to facilitate requests to provision a workflow from an inline defined or stored use case template
Expand All @@ -39,10 +41,16 @@ public class RestProvisionWorkflowAction extends BaseRestHandler {

private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action";

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestProvisionWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestProvisionWorkflowAction() {}
public RestProvisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
Expand All @@ -61,6 +69,12 @@ public List<Route> routes() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -31,6 +39,9 @@ public class FlowFrameworkPluginTests extends OpenSearchTestCase {
private ClusterAdminClient clusterAdminClient;
private ThreadPool threadPool;
private Settings settings;
private Environment environment;
private ClusterSettings clusterSettings;
private ClusterService clusterService;

@Override
public void setUp() throws Exception {
Expand All @@ -41,7 +52,18 @@ public void setUp() throws Exception {
when(client.admin()).thenReturn(adminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
threadPool = new TestThreadPool(FlowFrameworkPluginTests.class.getName());
settings = Settings.EMPTY;

environment = mock(Environment.class);
settings = Settings.builder().build();
when(environment.settings()).thenReturn(settings);

final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}

@Override
Expand All @@ -52,10 +74,14 @@ public void tearDown() throws Exception {

public void testPlugin() throws IOException {
try (FlowFrameworkPlugin ffp = new FlowFrameworkPlugin()) {
assertEquals(3, ffp.createComponents(client, null, threadPool, null, null, null, null, null, null, null, null).size());
assertEquals(
3,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(2, ffp.getRestHandlers(null, null, null, null, null, null, null).size());
assertEquals(2, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(1, ffp.getSettings().size());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FlowFrameworkFeatureEnabledSettingTests extends OpenSearchTestCase {

private Settings settings;
private ClusterSettings clusterSettings;
private ClusterService clusterService;

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

@Override
public void setUp() throws Exception {
super.setUp();

settings = Settings.builder().build();
final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
}

public void testSettings() throws IOException {
assertFalse(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
Expand All @@ -30,6 +31,7 @@

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RestCreateWorkflowActionTests extends OpenSearchTestCase {

Expand All @@ -38,10 +40,13 @@ public class RestCreateWorkflowActionTests extends OpenSearchTestCase {
private String createWorkflowPath;
private String updateWorkflowPath;
private NodeClient nodeClient;
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

@Override
public void setUp() throws Exception {
super.setUp();
flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class);
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true);

Version templateVersion = Version.fromString("1.0.0");
List<Version> compatibilityVersions = List.of(Version.fromString("2.0.0"), Version.fromString("3.0.0"));
Expand All @@ -64,7 +69,7 @@ public void setUp() throws Exception {

// Invalid template configuration, wrong field name
this.invalidTemplate = template.toJson().replace("use_case", "invalid");
this.createWorkflowRestAction = new RestCreateWorkflowAction();
this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting);
this.createWorkflowPath = String.format(Locale.ROOT, "%s", WORKFLOW_URI);
this.updateWorkflowPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id");
this.nodeClient = mock(NodeClient.class);
Expand Down Expand Up @@ -95,4 +100,15 @@ public void testInvalidCreateWorkflowRequest() throws Exception {
assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("Unable to parse field [invalid] in a template object."));
}

public void testFeatureFlagNotEnabled() throws Exception {
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false);
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
.withPath(this.createWorkflowPath)
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
createWorkflowRestAction.handleRequest(request, channel, nodeClient);
assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled."));
}
}
Loading

0 comments on commit 32f778f

Please sign in to comment.