From 8b2f1790dda70d87b1ce8577d311547e6587d28e Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 23 Mar 2023 10:02:25 -0700 Subject: [PATCH] Fix response handling, it works (again) Signed-off-by: Daniel Widdis --- .../org/opensearch/action/ActionModule.java | 4 +- .../extensions/ExtensionsManager.java | 2 - .../extensions/action/ExtensionAction.java | 4 +- .../action/ExtensionProxyTransportAction.java | 50 +++++++++++++++++++ .../action/ExtensionTransportAction.java | 21 +++----- .../ExtensionTransportActionsHandler.java | 26 ++++------ .../opensearch/action/ActionModuleTests.java | 8 +-- ...ExtensionTransportActionsHandlerTests.java | 5 -- 8 files changed, 74 insertions(+), 46 deletions(-) create mode 100644 server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index ceb1cd4823ba5..202defad539c4 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -287,7 +287,7 @@ import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.FeatureFlags; import org.opensearch.extensions.action.ExtensionProxyAction; -import org.opensearch.extensions.action.ExtensionTransportAction; +import org.opensearch.extensions.action.ExtensionProxyTransportAction; import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -724,7 +724,7 @@ public void reg if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { // ExtensionProxyAction - actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class); + actions.register(ExtensionProxyAction.INSTANCE, ExtensionProxyTransportAction.class); } // Decommission actions diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 9bacbe4941ba1..ccc1bdb620f31 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -198,10 +198,8 @@ public void initializeServicesAndRestHandler( this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler( extensionIdMap, transportService, - clusterService, client, actionModule, - settingsModule, this ); registerRequestHandler(); diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java index dcd00a584a8fe..658c114d73c1a 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java @@ -17,7 +17,7 @@ * * @opensearch.internal */ -public class ExtensionAction extends ActionType { +public class ExtensionAction extends ActionType { private final String uniqueId; @@ -28,7 +28,7 @@ public class ExtensionAction extends ActionType { * @param name The fully qualified class name of the extension's action to execute. */ public ExtensionAction(String uniqueId, String name) { - super(name, ExtensionActionResponse::new); + super(name, RemoteExtensionActionResponse::new); this.uniqueId = uniqueId; } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java new file mode 100644 index 0000000000000..364965dc582e6 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * A proxy transport action used to proxy a transport request from OpenSearch or a plugin to execute on an extension + * + * @opensearch.internal + */ +public class ExtensionProxyTransportAction extends HandledTransportAction { + + private final ExtensionsManager extensionsManager; + + @Inject + public ExtensionProxyTransportAction( + Settings settings, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + ExtensionsManager extensionsManager + ) { + super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new); + this.extensionsManager = extensionsManager; + } + + @Override + protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { + try { + listener.onResponse(extensionsManager.handleTransportRequest(request)); + } catch (Exception e) { + listener.onFailure(e); + } + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java index ce1bd13271aba..3d19b076812b3 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java @@ -10,39 +10,34 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.settings.Settings; +import org.opensearch.action.support.TransportAction; import org.opensearch.extensions.ExtensionsManager; import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportService; +import org.opensearch.tasks.TaskManager; /** * A proxy transport action used to proxy a transport request from OpenSearch or a plugin to execute on an extension * * @opensearch.internal */ -public class ExtensionTransportAction extends HandledTransportAction { +public class ExtensionTransportAction extends TransportAction { private final ExtensionsManager extensionsManager; - @Inject public ExtensionTransportAction( - Settings settings, - TransportService transportService, + String actionName, ActionFilters actionFilters, - ClusterService clusterService, + TaskManager taskManager, ExtensionsManager extensionsManager ) { - super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new); + super(actionName, actionFilters, taskManager); this.extensionsManager = extensionsManager; } @Override - protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { + protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { try { - listener.onResponse(extensionsManager.handleTransportRequest(request)); + listener.onResponse(extensionsManager.handleRemoteTransportRequest(request)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java index 68fa827d8db5c..08f93cf5ce81a 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -15,9 +15,7 @@ import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.support.ActionFilters; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.settings.SettingsModule; import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.extensions.AcknowledgedResponse; import org.opensearch.extensions.ExtensionsManager; @@ -48,29 +46,23 @@ public class ExtensionTransportActionsHandler { // Map of Extension unique ID to Extension Node, populated in Extensions Manager private final Map extensionIdMap; private final TransportService transportService; - private final ClusterService clusterService; private final NodeClient client; private final ActionFilters actionFilters; private final DynamicActionRegistry dynamicActionRegistry; - private final SettingsModule settingsModule; private final ExtensionsManager extensionsManager; public ExtensionTransportActionsHandler( Map extensionIdMap, TransportService transportService, - ClusterService clusterService, NodeClient client, ActionModule actionModule, - SettingsModule settingsModule, ExtensionsManager extensionsManager ) { this.extensionIdMap = extensionIdMap; this.transportService = transportService; - this.clusterService = clusterService; this.client = client; this.actionFilters = actionModule.getActionFilters(); this.dynamicActionRegistry = actionModule.getDynamicActionRegistry(); - this.settingsModule = settingsModule; this.extensionsManager = extensionsManager; } @@ -89,7 +81,7 @@ void registerAction(String action, String uniqueId) throws IllegalArgumentExcept // Register the action in the action module's dynamic actions map dynamicActionRegistry.registerDynamicAction( new ExtensionAction(uniqueId, action), - new ExtensionTransportAction(settingsModule.getSettings(), transportService, actionFilters, clusterService, extensionsManager) + new ExtensionTransportAction(action, actionFilters, transportService.getTaskManager(), extensionsManager) ); } @@ -104,6 +96,7 @@ public DiscoveryExtensionNode getExtension(String action) { if (uniqueId == null) { throw new ActionNotFoundTransportException(action); } + logger.info("Got extension: " + extensionIdMap.get(uniqueId)); return extensionIdMap.get(uniqueId); } @@ -117,10 +110,11 @@ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransport logger.debug("Register Transport Actions request recieved {}", transportActionsRequest); try { for (String action : transportActionsRequest.getTransportActions()) { + logger.info("Registering action " + action + " with id " + transportActionsRequest.getUniqueId()); registerAction(action, transportActionsRequest.getUniqueId()); } } catch (Exception e) { - logger.error("Could not register Transport Action " + e); + logger.error("Could not register Transport Action: " + e.getMessage()); return new AcknowledgedResponse(false); } return new AcknowledgedResponse(true); @@ -160,10 +154,12 @@ public RemoteExtensionActionResponse handleTransportActionRequestFromExtension(T client.execute( extensionAction, new ExtensionActionRequest(request.getAction(), request.getRequestBytes()), - new ActionListener() { + new ActionListener() { @Override - public void onResponse(ExtensionActionResponse actionResponse) { - inProgressFuture.complete(new RemoteExtensionActionResponse(actionResponse)); + public void onResponse(RemoteExtensionActionResponse actionResponse) { + response.setSuccess(actionResponse.isSuccess()); + response.setResponseBytes(actionResponse.getResponseBytes()); + inProgressFuture.complete(actionResponse); } @Override @@ -274,7 +270,7 @@ public RemoteExtensionActionResponse read(StreamInput in) throws IOException { @Override public void handleResponse(RemoteExtensionActionResponse response) { - extensionActionResponse.setSuccess(true); + extensionActionResponse.setSuccess(response.isSuccess()); extensionActionResponse.setResponseBytes(response.getResponseBytes()); inProgressFuture.complete(response); } @@ -294,7 +290,7 @@ public String executor() { try { transportService.sendRequest( extension, - ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION, + ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION, new ExtensionHandleTransportRequest(request.getAction(), request.getRequestBytes()), extensionActionResponseTransportResponseHandler ); diff --git a/server/src/test/java/org/opensearch/action/ActionModuleTests.java b/server/src/test/java/org/opensearch/action/ActionModuleTests.java index d7fbb3f42132a..94ebf0fcf8816 100644 --- a/server/src/test/java/org/opensearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/opensearch/action/ActionModuleTests.java @@ -284,13 +284,7 @@ public void testDynamicActionRegistry() { // ExtensionsAction not yet registered ExtensionAction testExtensionAction = new ExtensionAction("extensionId", "actionName"); - ExtensionTransportAction testExtensionTransportAction = new ExtensionTransportAction( - Settings.EMPTY, - null, - emptyFilters, - null, - null - ); + ExtensionTransportAction testExtensionTransportAction = new ExtensionTransportAction("test-action", emptyFilters, null, null); assertNull(dynamicActionRegistry.get(testExtensionAction)); // Register an extension action diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 1890a79222ffa..3fea207cbb700 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -19,7 +19,6 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.Settings; -import org.opensearch.common.settings.SettingsModule; import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.extensions.DiscoveryExtensionNode; @@ -102,15 +101,11 @@ public void setup() throws Exception { dynamicActionRegistry.registerUnmodifiableActionMap(Collections.emptyMap()); when(mockActionModule.getDynamicActionRegistry()).thenReturn(dynamicActionRegistry); when(mockActionModule.getActionFilters()).thenReturn(EMPTY_FILTERS); - SettingsModule mockSettingsModule = mock(SettingsModule.class); - when(mockSettingsModule.getSettings()).thenReturn(Settings.EMPTY); extensionTransportActionsHandler = new ExtensionTransportActionsHandler( Map.of("uniqueid1", discoveryExtensionNode), transportService, - null, client, mockActionModule, - mockSettingsModule, null ); }