From 1b64d5a41f6899a1272953baaa456f4604c66aa4 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 14 Dec 2023 18:10:12 -0800 Subject: [PATCH] Add Delete Workflow API (#294) Signed-off-by: Daniel Widdis --- .../flowframework/FlowFrameworkPlugin.java | 5 + .../rest/RestDeleteWorkflowAction.java | 105 +++++++++++++++ .../transport/DeleteWorkflowAction.java | 28 ++++ .../DeleteWorkflowTransportAction.java | 66 ++++++++++ .../DeprovisionWorkflowTransportAction.java | 2 +- .../FlowFrameworkPluginTests.java | 4 +- .../rest/RestDeleteWorkflowActionTests.java | 96 ++++++++++++++ .../rest/RestGetWorkflowActionTests.java | 8 +- .../DeleteWorkflowTransportActionTests.java | 123 ++++++++++++++++++ 9 files changed, 430 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java create mode 100644 src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowAction.java create mode 100644 src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java create mode 100644 src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java create mode 100644 src/test/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportActionTests.java diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index d69c0b588..4ffd69342 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -28,6 +28,7 @@ import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; +import org.opensearch.flowframework.rest.RestDeleteWorkflowAction; import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction; import org.opensearch.flowframework.rest.RestGetWorkflowAction; import org.opensearch.flowframework.rest.RestGetWorkflowStateAction; @@ -36,6 +37,8 @@ import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.CreateWorkflowTransportAction; +import org.opensearch.flowframework.transport.DeleteWorkflowAction; +import org.opensearch.flowframework.transport.DeleteWorkflowTransportAction; import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction; import org.opensearch.flowframework.transport.GetWorkflowAction; @@ -139,6 +142,7 @@ public List getRestHandlers( ) { return ImmutableList.of( new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService), + new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting), @@ -152,6 +156,7 @@ public List getRestHandlers( public List> getActions() { return ImmutableList.of( new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class), + new ActionHandler<>(DeleteWorkflowAction.INSTANCE, DeleteWorkflowTransportAction.class), new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class), new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class), new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class), diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java new file mode 100644 index 000000000..e017ee581 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeleteWorkflowAction.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.transport.DeleteWorkflowAction; +import org.opensearch.flowframework.transport.WorkflowRequest; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; + +/** + * Rest Action to facilitate requests to delete a stored template + */ +public class RestDeleteWorkflowAction extends BaseRestHandler { + + private static final String DELETE_WORKFLOW_ACTION = "delete_workflow"; + private static final Logger logger = LogManager.getLogger(RestDeleteWorkflowAction.class); + private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + + /** + * Instantiates a new RestDeleteWorkflowAction + * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled + */ + public RestDeleteWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; + } + + @Override + public String getName() { + return DELETE_WORKFLOW_ACTION; + } + + @Override + public List routes() { + return ImmutableList.of(new Route(RestRequest.Method.DELETE, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID))); + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String workflowId = request.param(WORKFLOW_ID); + try { + if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { + throw new FlowFrameworkException( + "This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.", + RestStatus.FORBIDDEN + ); + } + // Validate content + if (request.hasContent()) { + throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST); + } + // Validate params + if (workflowId == null) { + throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); + } + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { + XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + }, exception -> { + try { + FlowFrameworkException ex = exception instanceof FlowFrameworkException + ? (FlowFrameworkException) exception + : new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)); + XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); + + } catch (IOException e) { + logger.error("Failed to send back delete workflow exception", e); + channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage())); + } + })); + + } catch (FlowFrameworkException ex) { + return channel -> channel.sendResponse( + new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowAction.java new file mode 100644 index 000000000..006f1f205 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowAction.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.action.delete.DeleteResponse; + +import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX; + +/** + * External Action for public facing RestGetWorkflowAction + */ +public class DeleteWorkflowAction extends ActionType { + /** The name of this action */ + public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/delete"; + /** An instance of this action */ + public static final DeleteWorkflowAction INSTANCE = new DeleteWorkflowAction(); + + private DeleteWorkflowAction() { + super(NAME, DeleteResponse::new); + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java new file mode 100644 index 000000000..4bc45da22 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportAction.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; + +/** + * Transport action to retrieve a use case template within the Global Context + */ +public class DeleteWorkflowTransportAction extends HandledTransportAction { + + private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + private final Client client; + + /** + * Instantiates a new DeleteWorkflowTransportAction instance + * @param transportService the transport service + * @param actionFilters action filters + * @param flowFrameworkIndicesHandler The Flow Framework indices handler + * @param client the OpenSearch Client + */ + @Inject + public DeleteWorkflowTransportAction( + TransportService transportService, + ActionFilters actionFilters, + FlowFrameworkIndicesHandler flowFrameworkIndicesHandler, + Client client + ) { + super(DeleteWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new); + this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; + this.client = client; + } + + @Override + protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { + if (flowFrameworkIndicesHandler.doesIndexExist(GLOBAL_CONTEXT_INDEX)) { + String workflowId = request.getWorkflowId(); + DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId); + + ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext(); + client.delete(deleteRequest, ActionListener.runBefore(listener, () -> context.restore())); + } else { + listener.onFailure(new FlowFrameworkException("There are no templates in the global context.", RestStatus.NOT_FOUND)); + } + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 784b67374..1777a2457 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -131,7 +131,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener provisionProcessSequence = workflowProcessSorter.sortProcessNodes(provisionWorkflow, workflowId); - workflowProcessSorter.validateGraph(provisionProcessSequence); + workflowProcessSorter.validate(provisionProcessSequence); // We have a valid template and sorted nodes, get the created resources getResourcesAndExecute(request.getWorkflowId(), provisionProcessSequence, listener); diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index 9f9529ca5..b08c27cfb 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -82,8 +82,8 @@ public void testPlugin() throws IOException { 4, ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size() ); - assertEquals(7, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); - assertEquals(7, ffp.getActions().size()); + assertEquals(8, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); + assertEquals(8, ffp.getActions().size()); assertEquals(1, ffp.getExecutorBuilders(settings).size()); assertEquals(5, ffp.getSettings().size()); } diff --git a/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java new file mode 100644 index 000000000..93fc27623 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/rest/RestDeleteWorkflowActionTests.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.List; +import java.util.Locale; + +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RestDeleteWorkflowActionTests extends OpenSearchTestCase { + private RestDeleteWorkflowAction restDeleteWorkflowAction; + private String getPath; + private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + private NodeClient nodeClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + this.getPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id"); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); + this.restDeleteWorkflowAction = new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting); + this.nodeClient = mock(NodeClient.class); + } + + public void testRestDeleteWorkflowActionName() { + String name = restDeleteWorkflowAction.getName(); + assertEquals("delete_workflow", name); + } + + public void testRestDeleteWorkflowActionRoutes() { + List routes = restDeleteWorkflowAction.routes(); + assertEquals(1, routes.size()); + assertEquals(RestRequest.Method.DELETE, routes.get(0).getMethod()); + assertEquals(this.getPath, routes.get(0).getPath()); + } + + public void testInvalidRequestWithContent() { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath(this.getPath) + .withContent(new BytesArray("request body"), MediaTypeRegistry.JSON) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> { + restDeleteWorkflowAction.handleRequest(request, channel, nodeClient); + }); + assertEquals("request [DELETE /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage()); + } + + public void testNullWorkflowId() throws Exception { + + // Request with no params + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath(this.getPath) + .build(); + + FakeRestChannel channel = new FakeRestChannel(request, true, 1); + restDeleteWorkflowAction.handleRequest(request, channel, nodeClient); + + assertEquals(1, channel.errors().get()); + assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("workflow_id cannot be null")); + } + + public void testFeatureFlagNotEnabled() throws Exception { + when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath(this.getPath) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + restDeleteWorkflowAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled.")); + } +} diff --git a/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java index 3a51f1a9e..31cfc701f 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestGetWorkflowActionTests.java @@ -56,7 +56,7 @@ public void testRestGetWorkflowActionRoutes() { } public void testInvalidRequestWithContent() { - RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) .withPath(this.getPath) .withContent(new BytesArray("request body"), MediaTypeRegistry.JSON) .build(); @@ -65,13 +65,13 @@ public void testInvalidRequestWithContent() { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> { restGetWorkflowAction.handleRequest(request, channel, nodeClient); }); - assertEquals("request [POST /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage()); + assertEquals("request [GET /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage()); } public void testNullWorkflowId() throws Exception { // Request with no params - RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) .withPath(this.getPath) .build(); @@ -85,7 +85,7 @@ public void testNullWorkflowId() throws Exception { public void testFeatureFlagNotEnabled() throws Exception { when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false); - RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) .withPath(this.getPath) .build(); FakeRestChannel channel = new FakeRestChannel(request, false, 1); diff --git a/src/test/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportActionTests.java new file mode 100644 index 000000000..b0ae61e6d --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/transport/DeleteWorkflowTransportActionTests.java @@ -0,0 +1,123 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.DocWriteResponse.Result; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import org.mockito.ArgumentCaptor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DeleteWorkflowTransportActionTests extends OpenSearchTestCase { + + private Client client; + private DeleteWorkflowTransportAction deleteWorkflowTransportAction; + private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.client = mock(Client.class); + this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); + this.deleteWorkflowTransportAction = new DeleteWorkflowTransportAction( + mock(TransportService.class), + mock(ActionFilters.class), + flowFrameworkIndicesHandler, + client + ); + + ThreadPool clientThreadPool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + when(client.threadPool()).thenReturn(clientThreadPool); + when(clientThreadPool.getThreadContext()).thenReturn(threadContext); + + } + + public void testDeleteWorkflowNoGlobalContext() { + + when(flowFrameworkIndicesHandler.doesIndexExist(anyString())).thenReturn(false); + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest workflowRequest = new WorkflowRequest("1", null); + deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertTrue(exceptionCaptor.getValue().getMessage().contains("There are no templates in the global context.")); + } + + public void testDeleteWorkflowSuccess() { + String workflowId = "12345"; + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + + when(flowFrameworkIndicesHandler.doesIndexExist(anyString())).thenReturn(true); + + // Stub client.delete to force on response + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + + ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1); + responseListener.onResponse(new DeleteResponse(shardId, workflowId, 1, 1, 1, true)); + return null; + }).when(client).delete(any(DeleteRequest.class), any()); + + deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals(Result.DELETED, responseCaptor.getValue().getResult()); + } + + public void testDeleteWorkflowNotFound() { + String workflowId = "12345"; + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + + when(flowFrameworkIndicesHandler.doesIndexExist(anyString())).thenReturn(true); + + // Stub client.delete to force on response + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + + ShardId shardId = new ShardId(new Index("indexName", "uuid"), 1); + responseListener.onResponse(new DeleteResponse(shardId, workflowId, 1, 1, 1, false)); + return null; + }).when(client).delete(any(DeleteRequest.class), any()); + + deleteWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(DeleteResponse.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals(Result.NOT_FOUND, responseCaptor.getValue().getResult()); + } +}