diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index db59a999a5293..bb13502c7aab9 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -282,12 +282,12 @@ import org.opensearch.common.inject.TypeLiteral; import org.opensearch.common.inject.multibindings.MapBinder; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.extensions.action.ExtensionProxyAction; +import org.opensearch.extensions.action.ExtensionTransportAction; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.extensions.action.ExtensionProxyAction; -import org.opensearch.extensions.action.ExtensionProxyTransportAction; import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -448,7 +448,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.function.UnaryOperator; @@ -456,7 +455,6 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; -import static java.util.Objects.requireNonNull; /** * Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}. @@ -473,17 +471,7 @@ public class ActionModule extends AbstractModule { private final ClusterSettings clusterSettings; private final SettingsFilter settingsFilter; private final List actionPlugins; - // The unmodifiable map containing OpenSearch and Plugin actions - // This is initialized at node bootstrap and contains same-JVM actions - // It will be wrapped in the Dynamic Action Registry but otherwise - // remains unchanged from its prior purpose, and registered actions - // will remain accessible. private final Map> actions; - // A dynamic action registry which includes the above immutable actions - // and also registers dynamic actions which may be unregistered. Usually - // associated with remote action execution on extensions, possibly in - // a different JVM and possibly on a different server. - private final DynamicActionRegistry dynamicActionRegistry; private final ActionFilters actionFilters; private final AutoCreateIndex autoCreateIndex; private final DestructiveOperations destructiveOperations; @@ -514,7 +502,6 @@ public ActionModule( this.threadPool = threadPool; actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); - dynamicActionRegistry = new DynamicActionRegistry(); autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices); destructiveOperations = new DestructiveOperations(settings, clusterSettings); Set headers = Stream.concat( @@ -724,7 +711,7 @@ public void reg if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { // ExtensionProxyAction - actions.register(ExtensionProxyAction.INSTANCE, ExtensionProxyTransportAction.class); + actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class); } // Decommission actions @@ -966,86 +953,13 @@ protected void configure() { bind(supportAction).asEagerSingleton(); } } - - // register dynamic ActionType -> transportAction Map used by NodeClient - bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry); } public ActionFilters getActionFilters() { return actionFilters; } - public DynamicActionRegistry getDynamicActionRegistry() { - return dynamicActionRegistry; - } - public RestController getRestController() { return restController; } - - /** - * The DynamicActionRegistry maintains a registry mapping {@link ActionType} instances to {@link TransportAction} instances. - *

- * This class is modeled after {@link NamedRegistry} but provides both register and unregister capabilities. - * - * @opensearch.internal - */ - public static class DynamicActionRegistry { - // This is the unmodifiable actions map created during node bootstrap, which - // will continue to link ActionType and TransportAction pairs from core and plugin - // action handler registration. - private Map actions = Collections.emptyMap(); - // A dynamic registry to add or remove ActionType / TransportAction pairs - // at times other than node bootstrap. - private final Map, TransportAction> registry = new ConcurrentHashMap<>(); - - /** - * Register the immutable actions in the registry. - * - * @param actions The injected map of {@link ActionType} to {@link TransportAction} - */ - public void registerUnmodifiableActionMap(Map actions) { - this.actions = actions; - } - - /** - * Add a dynamic action to the registry. - * - * @param action The action instance to add - * @param transportAction The corresponding instance of transportAction to execute - */ - public void registerDynamicAction(ActionType action, TransportAction transportAction) { - requireNonNull(action, "action is required"); - requireNonNull(transportAction, "transportAction is required"); - if (actions.containsKey(action) || registry.putIfAbsent(action, transportAction) != null) { - throw new IllegalArgumentException("action [" + action.name() + "] already registered"); - } - } - - /** - * Remove a dynamic action from the registry. - * - * @param action The action to remove - */ - public void unregisterDynamicAction(ActionType action) { - requireNonNull(action, "action is required"); - if (registry.remove(action) == null) { - throw new IllegalArgumentException("action [" + action.name() + "] was not registered"); - } - } - - /** - * Gets the {@link TransportAction} instance corresponding to the {@link ActionType} instance. - * - * @param action The {@link ActionType}. - * @return the corresponding {@link TransportAction} if it is registered, null otherwise. - */ - @SuppressWarnings("unchecked") - public TransportAction get(ActionType action) { - if (actions.containsKey(action)) { - return actions.get(action); - } - return registry.get(action); - } - } } diff --git a/server/src/main/java/org/opensearch/client/node/NodeClient.java b/server/src/main/java/org/opensearch/client/node/NodeClient.java index 3341bfe326990..56cb7c406744a 100644 --- a/server/src/main/java/org/opensearch/client/node/NodeClient.java +++ b/server/src/main/java/org/opensearch/client/node/NodeClient.java @@ -34,7 +34,6 @@ import org.opensearch.action.ActionType; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.action.support.TransportAction; @@ -48,6 +47,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterService; +import java.util.Map; import java.util.function.Supplier; /** @@ -57,7 +57,7 @@ */ public class NodeClient extends AbstractClient { - private DynamicActionRegistry actionRegistry; + private Map actions; /** * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by * {@link #executeLocally(ActionType, ActionRequest, TaskListener)}. @@ -71,12 +71,12 @@ public NodeClient(Settings settings, ThreadPool threadPool) { } public void initialize( - DynamicActionRegistry actionRegistry, + Map actions, Supplier localNodeId, RemoteClusterService remoteClusterService, NamedWriteableRegistry namedWriteableRegistry ) { - this.actionRegistry = actionRegistry; + this.actions = actions; this.localNodeId = localNodeId; this.remoteClusterService = remoteClusterService; this.namedWriteableRegistry = namedWriteableRegistry; @@ -137,10 +137,10 @@ public String getLocalNodeId() { private TransportAction transportAction( ActionType action ) { - if (actionRegistry == null) { + if (actions == null) { throw new IllegalStateException("NodeClient has not been initialized"); } - TransportAction transportAction = (TransportAction) actionRegistry.get(action); + TransportAction transportAction = actions.get(action); if (transportAction == null) { throw new IllegalStateException("failed to find action [" + action + "] to execute"); } diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index ccc1bdb620f31..4f852ca944966 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -30,7 +30,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.Version; -import org.opensearch.action.ActionModule; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; @@ -48,8 +47,6 @@ import org.opensearch.extensions.action.ExtensionActionRequest; import org.opensearch.extensions.action.ExtensionActionResponse; import org.opensearch.extensions.action.ExtensionTransportActionsHandler; -import org.opensearch.extensions.action.RegisterTransportActionsRequest; -import org.opensearch.extensions.action.RemoteExtensionActionResponse; import org.opensearch.extensions.action.TransportActionRequestFromExtension; import org.opensearch.extensions.rest.RegisterRestActionsRequest; import org.opensearch.extensions.rest.RestActionsRequestHandler; @@ -61,6 +58,7 @@ import org.opensearch.index.IndicesModuleResponse; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.indices.cluster.IndicesClusterStateService; +import org.opensearch.rest.RestController; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.TransportException; @@ -91,7 +89,6 @@ public class ExtensionsManager { public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable"; public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction"; public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction"; - public static final String REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION = "internal:extensions/handle-remote-transportaction"; public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension"; public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10; @@ -169,7 +166,7 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept * Initializes the {@link RestActionsRequestHandler}, {@link TransportService}, {@link ClusterService} and environment settings. This is called during Node bootstrap. * Lists/maps of extensions have already been initialized but not yet populated. * - * @param actionModule The ActionModule with the RestController and DynamicActionModule + * @param restController The RestController on which to register Rest Actions. * @param settingsModule The module that binds the provided settings to interface. * @param transportService The Node's transport service. * @param clusterService The Node's cluster service. @@ -177,14 +174,14 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept * @param client The client used to make transport requests */ public void initializeServicesAndRestHandler( - ActionModule actionModule, + RestController restController, SettingsModule settingsModule, TransportService transportService, ClusterService clusterService, Settings initialEnvironmentSettings, NodeClient client ) { - this.restActionsRequestHandler = new RestActionsRequestHandler(actionModule.getRestController(), extensionIdMap, transportService); + this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService); this.customSettingsRequestHandler = new CustomSettingsRequestHandler(settingsModule); this.transportService = transportService; this.clusterService = clusterService; @@ -195,13 +192,7 @@ public void initializeServicesAndRestHandler( REQUEST_EXTENSION_UPDATE_SETTINGS ); this.client = client; - this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler( - extensionIdMap, - transportService, - client, - actionModule, - this - ); + this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client); registerRequestHandler(); } @@ -210,15 +201,6 @@ public void initializeServicesAndRestHandler( * * @param request which was sent by an extension. */ - public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception { - return extensionTransportActionsHandler.sendRemoteTransportRequestToExtension(request); - } - - /** - * Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by OpenSearch or a plugin - * - * @param request which was sent by an extension. - */ public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception { return extensionTransportActionsHandler.sendTransportRequestToExtension(request); } diff --git a/server/src/main/java/org/opensearch/extensions/action/RegisterTransportActionsRequest.java b/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java similarity index 98% rename from server/src/main/java/org/opensearch/extensions/action/RegisterTransportActionsRequest.java rename to server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java index be711ee69dea6..94b15e2192722 100644 --- a/server/src/main/java/org/opensearch/extensions/action/RegisterTransportActionsRequest.java +++ b/server/src/main/java/org/opensearch/extensions/RegisterTransportActionsRequest.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.extensions.action; +package org.opensearch.extensions; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java deleted file mode 100644 index 658c114d73c1a..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionAction.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions.action; - -import org.opensearch.action.ActionType; - -import java.util.Objects; - -/** - * An {@link ActionType} to be used in extension action transport handling. - * - * @opensearch.internal - */ -public class ExtensionAction extends ActionType { - - private final String uniqueId; - - /** - * Create an instance of this action to register in the dynamic actions map. - * - * @param uniqueId The uniqueId of the extension which will run this action. - * @param name The fully qualified class name of the extension's action to execute. - */ - public ExtensionAction(String uniqueId, String name) { - super(name, RemoteExtensionActionResponse::new); - this.uniqueId = uniqueId; - } - - /** - * Gets the uniqueId of the extension which will run this action. - * - * @return the uniqueId - */ - public String uniqueId() { - return this.uniqueId; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = super.hashCode(); - result = prime * result + Objects.hash(uniqueId); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!super.equals(obj)) return false; - if (getClass() != obj.getClass()) return false; - ExtensionAction other = (ExtensionAction) obj; - return Objects.equals(uniqueId, other.uniqueId); - } -} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java deleted file mode 100644 index 364965dc582e6..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionProxyTransportAction.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions.action; - -import org.opensearch.action.ActionListener; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.settings.Settings; -import org.opensearch.extensions.ExtensionsManager; -import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportService; - -/** - * A proxy transport action used to proxy a transport request from OpenSearch or a plugin to execute on an extension - * - * @opensearch.internal - */ -public class ExtensionProxyTransportAction extends HandledTransportAction { - - private final ExtensionsManager extensionsManager; - - @Inject - public ExtensionProxyTransportAction( - Settings settings, - TransportService transportService, - ActionFilters actionFilters, - ClusterService clusterService, - ExtensionsManager extensionsManager - ) { - super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new); - this.extensionsManager = extensionsManager; - } - - @Override - protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { - try { - listener.onResponse(extensionsManager.handleTransportRequest(request)); - } catch (Exception e) { - listener.onFailure(e); - } - } -} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java index 4b0b9725e50ae..5976db78002eb 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportAction.java @@ -10,34 +10,44 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.TransportAction; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.node.Node; import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskManager; +import org.opensearch.transport.TransportService; /** - * A proxy transport action used to proxy a transport request from an extension to execute on another extension + * The main proxy transport action used to proxy a transport request from extension to another extension * * @opensearch.internal */ -public class ExtensionTransportAction extends TransportAction { +public class ExtensionTransportAction extends HandledTransportAction { + private final String nodeName; + private final ClusterService clusterService; private final ExtensionsManager extensionsManager; + @Inject public ExtensionTransportAction( - String actionName, + Settings settings, + TransportService transportService, ActionFilters actionFilters, - TaskManager taskManager, + ClusterService clusterService, ExtensionsManager extensionsManager ) { - super(actionName, actionFilters, taskManager); + super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new); + this.nodeName = Node.NODE_NAME_SETTING.get(settings); + this.clusterService = clusterService; this.extensionsManager = extensionsManager; } @Override - protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { + protected void doExecute(Task task, ExtensionActionRequest request, ActionListener listener) { try { - listener.onResponse(extensionsManager.handleRemoteTransportRequest(request)); + listener.onResponse(extensionsManager.handleTransportRequest(request)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java index 3fba76b7d3c59..1f2b58c2bd524 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -11,14 +11,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionModule; -import org.opensearch.action.ActionModule.DynamicActionRegistry; -import org.opensearch.action.support.ActionFilters; import org.opensearch.client.node.NodeClient; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.extensions.AcknowledgedResponse; import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.RegisterTransportActionsRequest; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ActionNotFoundTransportException; import org.opensearch.transport.TransportException; @@ -27,10 +25,11 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -41,62 +40,44 @@ */ public class ExtensionTransportActionsHandler { private static final Logger logger = LogManager.getLogger(ExtensionTransportActionsHandler.class); - // Map of action name to Extension unique ID, populated locally - private final Map actionToIdMap = new ConcurrentHashMap<>(); - // Map of Extension unique ID to Extension Node, populated in Extensions Manager + private Map actionsMap; private final Map extensionIdMap; private final TransportService transportService; private final NodeClient client; - private final ActionFilters actionFilters; - private final DynamicActionRegistry dynamicActionRegistry; - private final ExtensionsManager extensionsManager; public ExtensionTransportActionsHandler( Map extensionIdMap, TransportService transportService, - NodeClient client, - ActionModule actionModule, - ExtensionsManager extensionsManager + NodeClient client ) { + this.actionsMap = new HashMap<>(); this.extensionIdMap = extensionIdMap; this.transportService = transportService; this.client = client; - this.actionFilters = actionModule.getActionFilters(); - this.dynamicActionRegistry = actionModule.getDynamicActionRegistry(); - this.extensionsManager = extensionsManager; } /** * Method to register actions for extensions. * * @param action to be registered. - * @param uniqueId id of extension for which action is being registered. + * @param extension for which action is being registered. * @throws IllegalArgumentException when action being registered already is registered. */ - void registerAction(String action, String uniqueId) throws IllegalArgumentException { - // Register the action in this handler so it knows which extension owns it - if (actionToIdMap.putIfAbsent(action, uniqueId) != null) { - throw new IllegalArgumentException("The action [" + action + "] you are trying to register is already registered"); + void registerAction(String action, DiscoveryExtensionNode extension) throws IllegalArgumentException { + if (actionsMap.containsKey(action)) { + throw new IllegalArgumentException("The " + action + " you are trying to register is already registered"); } - // Register the action in the action module's dynamic actions map - dynamicActionRegistry.registerDynamicAction( - new ExtensionAction(uniqueId, action), - new ExtensionTransportAction(action, actionFilters, transportService.getTaskManager(), extensionsManager) - ); + actionsMap.putIfAbsent(action, extension); } /** * Method to get extension for a given action. * * @param action for which to get the registered extension. - * @return the extension or null if not found + * @return the extension. */ public DiscoveryExtensionNode getExtension(String action) { - String uniqueId = actionToIdMap.get(action); - if (uniqueId == null) { - throw new ActionNotFoundTransportException(action); - } - return extensionIdMap.get(uniqueId); + return actionsMap.get(action); } /** @@ -106,12 +87,17 @@ public DiscoveryExtensionNode getExtension(String action) { * @return A {@link AcknowledgedResponse} indicating success. */ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) { + /* + * We are proxying the transport Actions through ExtensionProxyAction, so we really dont need to register dynamic actions for now. + */ + logger.debug("Register Transport Actions request recieved {}", transportActionsRequest); + DiscoveryExtensionNode extension = extensionIdMap.get(transportActionsRequest.getUniqueId()); try { for (String action : transportActionsRequest.getTransportActions()) { - registerAction(action, transportActionsRequest.getUniqueId()); + registerAction(action, extension); } } catch (Exception e) { - logger.error("Could not register Transport Action: " + e.getMessage()); + logger.error("Could not register Transport Action " + e); return new AcknowledgedResponse(false); } return new AcknowledgedResponse(true); @@ -124,37 +110,16 @@ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransport * @return {@link TransportResponse} which is sent back to the transport action invoker. * @throws InterruptedException when message transport fails. */ - public RemoteExtensionActionResponse handleTransportActionRequestFromExtension(TransportActionRequestFromExtension request) - throws Exception { - String actionName = request.getAction(); - String uniqueId = actionToIdMap.get(actionName); - final RemoteExtensionActionResponse response = new RemoteExtensionActionResponse(false, new byte[0]); - // Fail fast if uniqueId is null - if (uniqueId == null) { - response.setResponseBytesAsString("Request failed: action [" + actionName + "] is not registered for any extension."); - return response; - } - ExtensionAction extensionAction = new ExtensionAction(uniqueId, actionName); - // Validate that this action has been registered - if (dynamicActionRegistry.get(extensionAction) == null) { - response.setResponseBytesAsString( - "Request failed: action [" + actionName + "] is not registered for extension [" + uniqueId + "]." - ); - return response; - } - DiscoveryExtensionNode extension = extensionIdMap.get(uniqueId); - if (extension == null) { - response.setResponseBytesAsString("Request failed: extension [" + uniqueId + "] can not be reached."); - return response; - } - final CompletableFuture inProgressFuture = new CompletableFuture<>(); + public TransportResponse handleTransportActionRequestFromExtension(TransportActionRequestFromExtension request) throws Exception { + DiscoveryExtensionNode extension = extensionIdMap.get(request.getUniqueId()); + final CompletableFuture inProgressFuture = new CompletableFuture<>(); + final TransportActionResponseToExtension response = new TransportActionResponseToExtension(new byte[0]); client.execute( - extensionAction, + ExtensionProxyAction.INSTANCE, new ExtensionActionRequest(request.getAction(), request.getRequestBytes()), - new ActionListener() { + new ActionListener() { @Override - public void onResponse(RemoteExtensionActionResponse actionResponse) { - response.setSuccess(actionResponse.isSuccess()); + public void onResponse(ExtensionActionResponse actionResponse) { response.setResponseBytes(actionResponse.getResponseBytes()); inProgressFuture.complete(actionResponse); } @@ -162,7 +127,8 @@ public void onResponse(RemoteExtensionActionResponse actionResponse) { @Override public void onFailure(Exception exp) { logger.debug("Transport request failed", exp); - response.setResponseBytesAsString("Request failed: " + exp.getMessage()); + byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); + response.setResponseBytes(responseBytes); inProgressFuture.completeExceptionally(exp); } } @@ -192,7 +158,10 @@ public void onFailure(Exception exp) { * @throws InterruptedException when message transport fails. */ public ExtensionActionResponse sendTransportRequestToExtension(ExtensionActionRequest request) throws Exception { - DiscoveryExtensionNode extension = getExtension(request.getAction()); + DiscoveryExtensionNode extension = actionsMap.get(request.getAction()); + if (extension == null) { + throw new ActionNotFoundTransportException(request.getAction()); + } final CompletableFuture inProgressFuture = new CompletableFuture<>(); final ExtensionActionResponse extensionActionResponse = new ExtensionActionResponse(new byte[0]); final TransportResponseHandler extensionActionResponseTransportResponseHandler = @@ -212,6 +181,8 @@ public void handleResponse(ExtensionActionResponse response) { @Override public void handleException(TransportException exp) { logger.debug("Transport request failed", exp); + byte[] responseBytes = ("Request failed: " + exp.getMessage()).getBytes(StandardCharsets.UTF_8); + extensionActionResponse.setResponseBytes(responseBytes); inProgressFuture.completeExceptionally(exp); } @@ -246,68 +217,4 @@ public String executor() { } return extensionActionResponse; } - - /** - * Method to send transport action request from a remote extension to another extension to handle. - * - * @param request to extension to handle transport request. - * @return {@link RemoteExtensionActionResponse} which encapsulates the transport response from the extension and its success. - */ - public RemoteExtensionActionResponse sendRemoteTransportRequestToExtension(ExtensionActionRequest request) { - DiscoveryExtensionNode extension = getExtension(request.getAction()); - final CompletableFuture inProgressFuture = new CompletableFuture<>(); - final RemoteExtensionActionResponse extensionActionResponse = new RemoteExtensionActionResponse(false, new byte[0]); - final TransportResponseHandler extensionActionResponseTransportResponseHandler = - new TransportResponseHandler() { - - @Override - public RemoteExtensionActionResponse read(StreamInput in) throws IOException { - return new RemoteExtensionActionResponse(in); - } - - @Override - public void handleResponse(RemoteExtensionActionResponse response) { - extensionActionResponse.setSuccess(response.isSuccess()); - extensionActionResponse.setResponseBytes(response.getResponseBytes()); - inProgressFuture.complete(response); - } - - @Override - public void handleException(TransportException exp) { - logger.debug("Transport request failed", exp); - extensionActionResponse.setResponseBytesAsString("Request failed: " + exp.getMessage()); - inProgressFuture.completeExceptionally(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } - }; - try { - transportService.sendRequest( - extension, - ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION, - new ExtensionHandleTransportRequest(request.getAction(), request.getRequestBytes()), - extensionActionResponseTransportResponseHandler - ); - } catch (Exception e) { - logger.info("Failed to send transport action to extension " + extension.getName(), e); - } - try { - inProgressFuture.orTimeout(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).join(); - } catch (CompletionException e) { - if (e.getCause() instanceof TimeoutException) { - logger.info("No response from extension to request."); - } - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else if (e.getCause() instanceof Error) { - throw (Error) e.getCause(); - } else { - throw new RuntimeException(e.getCause()); - } - } - return extensionActionResponse; - } } diff --git a/server/src/main/java/org/opensearch/extensions/action/RemoteExtensionActionResponse.java b/server/src/main/java/org/opensearch/extensions/action/RemoteExtensionActionResponse.java deleted file mode 100644 index adc269a037231..0000000000000 --- a/server/src/main/java/org/opensearch/extensions/action/RemoteExtensionActionResponse.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.extensions.action; - -import org.opensearch.action.ActionResponse; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -/** - * This class encapsulates the {@link ExtensionActionResponse} from an extension, adding a field denoting success - * - * @opensearch.internal - */ -public class RemoteExtensionActionResponse extends ActionResponse { - /** - * Indicates whether the response was successful. If false, responseBytes will include an error message. - */ - private boolean success; - /** - * responseBytes is the raw bytes being transported between extensions. - */ - private ExtensionActionResponse response; - - /** - * RemoteExtensionActionResponse constructor. - * - * @param success Whether the response was successful. - * @param responseBytes is the raw bytes being transported between extensions. - */ - public RemoteExtensionActionResponse(boolean success, byte[] responseBytes) { - this.success = success; - this.response = new ExtensionActionResponse(responseBytes); - } - - /** - * RemoteExtensionActionResponse constructor from an {@link ExtensionActionResponse}. - * - * @param response an ExtensionActionResponse in which the first byte denotes success or failure - */ - public RemoteExtensionActionResponse(ExtensionActionResponse response) { - byte[] combinedBytes = response.getResponseBytes(); - this.success = combinedBytes[0] != 0; - byte[] responseBytes = new byte[combinedBytes.length - 1]; - System.arraycopy(combinedBytes, 1, responseBytes, 0, responseBytes.length); - this.response = new ExtensionActionResponse(responseBytes); - } - - /** - * RemoteExtensionActionResponse constructor from {@link StreamInput}. - * - * @param in bytes stream input used to de-serialize the message. - * @throws IOException when message de-serialization fails. - */ - public RemoteExtensionActionResponse(StreamInput in) throws IOException { - this.success = in.readBoolean(); - this.response = new ExtensionActionResponse(in); - } - - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public byte[] getResponseBytes() { - return response.getResponseBytes(); - } - - public void setResponseBytes(byte[] responseBytes) { - this.response = new ExtensionActionResponse(responseBytes); - } - - /** - * Gets the Response bytes as a {@link StreamInput} - * - * @return A StreamInput representation of the response bytes - */ - public StreamInput getResponseBytesAsStream() { - return StreamInput.wrap(response.getResponseBytes()); - } - - /** - * Gets the Response bytes as a UTF-8 string - * - * @return A string representation of the response bytes - */ - public String getResponseBytesAsString() { - return new String(response.getResponseBytes(), StandardCharsets.UTF_8); - } - - /** - * Sets the Response bytes from a UTF-8 string - * - * @param response The response to convert to bytes - */ - public void setResponseBytesAsString(String response) { - this.response = new ExtensionActionResponse(response.getBytes(StandardCharsets.UTF_8)); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(success); - response.writeTo(out); - } -} diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java index 1f90d3224bb82..df494297559b3 100644 --- a/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionRequestFromExtension.java @@ -8,10 +8,9 @@ package org.opensearch.extensions.action; -import org.opensearch.action.ActionRequest; -import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportRequest; import java.io.IOException; import java.util.Objects; @@ -21,7 +20,7 @@ * * @opensearch.api */ -public class TransportActionRequestFromExtension extends ActionRequest { +public class TransportActionRequestFromExtension extends TransportRequest { /** * action is the transport action intended to be invoked which is registered by an extension via {@link ExtensionTransportActionsHandler}. */ @@ -81,11 +80,6 @@ public String getUniqueId() { return this.uniqueId; } - @Override - public ActionRequestValidationException validate() { - return null; - } - @Override public String toString() { return "TransportActionRequestFromExtension{action=" + action + ", requestBytes=" + requestBytes + ", uniqueId=" + uniqueId + "}"; diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java b/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java new file mode 100644 index 0000000000000..2913402bcd5e1 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/TransportActionResponseToExtension.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; + +/** + * This class encapsulates transport response to extension. + * + * @opensearch.api + */ +public class TransportActionResponseToExtension extends TransportResponse { + /** + * responseBytes is the raw bytes being transported between extensions. + */ + private byte[] responseBytes; + + /** + * TransportActionResponseToExtension constructor. + * + * @param responseBytes is the raw bytes being transported between extensions. + */ + public TransportActionResponseToExtension(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + /** + * TransportActionResponseToExtension constructor from {@link StreamInput} + * @param in bytes stream input used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public TransportActionResponseToExtension(StreamInput in) throws IOException { + this.responseBytes = in.readByteArray(); + } + + public void setResponseBytes(byte[] responseBytes) { + this.responseBytes = responseBytes; + } + + public byte[] getResponseBytes() { + return responseBytes; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByteArray(responseBytes); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index a23771d2bc80a..b96ec75f51fce 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -58,7 +58,6 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; import org.opensearch.action.ActionModule; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionType; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; @@ -843,7 +842,7 @@ protected Node( ); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { this.extensionsManager.initializeServicesAndRestHandler( - actionModule, + restController, settingsModule, transportService, clusterService, @@ -1113,15 +1112,8 @@ protected Node( resourcesToClose.addAll(pluginLifecycleComponents); resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); - DynamicActionRegistry dynamicActionRegistry = actionModule.getDynamicActionRegistry(); - dynamicActionRegistry.registerUnmodifiableActionMap(injector.getInstance(new Key>() { - })); - client.initialize( - dynamicActionRegistry, - () -> clusterService.localNode().getId(), - transportService.getRemoteClusterService(), - namedWriteableRegistry - ); + client.initialize(injector.getInstance(new Key>() { + }), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry); this.namedWriteableRegistry = namedWriteableRegistry; logger.debug("initializing HTTP handlers ..."); diff --git a/server/src/test/java/org/opensearch/action/ActionModuleTests.java b/server/src/test/java/org/opensearch/action/ActionModuleTests.java index 94ebf0fcf8816..3193a8d953763 100644 --- a/server/src/test/java/org/opensearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/opensearch/action/ActionModuleTests.java @@ -32,7 +32,6 @@ package org.opensearch.action; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; import org.opensearch.action.support.ActionFilters; @@ -40,16 +39,12 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.Writeable.Reader; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.common.settings.SettingsModule; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.extensions.action.ExtensionAction; -import org.opensearch.extensions.action.ExtensionTransportAction; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ActionPlugin.ActionHandler; @@ -67,9 +62,7 @@ import org.opensearch.usage.UsageService; import java.io.IOException; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -269,72 +262,4 @@ public List routes() { threadPool.shutdown(); } } - - public void testDynamicActionRegistry() { - ActionFilters emptyFilters = new ActionFilters(Collections.emptySet()); - Map testMap = Map.of(TestAction.INSTANCE, new TestTransportAction("test-action", emptyFilters, null)); - - DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); - dynamicActionRegistry.registerUnmodifiableActionMap(testMap); - - // Should contain the immutable map entry - assertNotNull(dynamicActionRegistry.get(TestAction.INSTANCE)); - // Should not contain anything not added - assertNull(dynamicActionRegistry.get(MainAction.INSTANCE)); - - // ExtensionsAction not yet registered - ExtensionAction testExtensionAction = new ExtensionAction("extensionId", "actionName"); - ExtensionTransportAction testExtensionTransportAction = new ExtensionTransportAction("test-action", emptyFilters, null, null); - assertNull(dynamicActionRegistry.get(testExtensionAction)); - - // Register an extension action - // Should insert without problem - try { - dynamicActionRegistry.registerDynamicAction(testExtensionAction, testExtensionTransportAction); - } catch (Exception e) { - fail("Should not have thrown exception registering action: " + e); - } - assertEquals(testExtensionTransportAction, dynamicActionRegistry.get(testExtensionAction)); - - // Should fail inserting twice - IllegalArgumentException ex = assertThrows( - IllegalArgumentException.class, - () -> dynamicActionRegistry.registerDynamicAction(testExtensionAction, testExtensionTransportAction) - ); - assertEquals("action [actionName] already registered", ex.getMessage()); - // Should remove without problem - try { - dynamicActionRegistry.unregisterDynamicAction(testExtensionAction); - } catch (Exception e) { - fail("Should not have thrown exception unregistering action: " + e); - } - // Should have been removed - assertNull(dynamicActionRegistry.get(testExtensionAction)); - - // Should fail removing twice - ex = assertThrows(IllegalArgumentException.class, () -> dynamicActionRegistry.unregisterDynamicAction(testExtensionAction)); - assertEquals("action [actionName] was not registered", ex.getMessage()); - } - - private static final class TestAction extends ActionType { - public static final TestAction INSTANCE = new TestAction(); - - private TestAction() { - super("test-action", new Reader() { - @Override - public ActionResponse read(StreamInput in) throws IOException { - return null; - } - }); - } - }; - - private static final class TestTransportAction extends TransportAction { - protected TestTransportAction(String actionName, ActionFilters actionFilters, TaskManager taskManager) { - super(actionName, actionFilters, taskManager); - } - - @Override - protected void doExecute(Task task, ActionRequest request, ActionListener listener) {} - } } diff --git a/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java index 1f63dba4457a9..cb9e3a6a19388 100644 --- a/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java @@ -33,7 +33,6 @@ package org.opensearch.client.node; import org.opensearch.action.ActionType; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRequest; import org.opensearch.action.support.ActionFilters; @@ -58,16 +57,14 @@ protected Client buildClient(Settings headersSettings, ActionType[] testedAction Settings settings = HEADER_SETTINGS; Actions actions = new Actions(settings, threadPool, testedActions); NodeClient client = new NodeClient(settings, threadPool); - DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); - dynamicActionRegistry.registerUnmodifiableActionMap(actions); - client.initialize(dynamicActionRegistry, () -> "test", null, new NamedWriteableRegistry(Collections.emptyList())); + client.initialize(actions, () -> "test", null, new NamedWriteableRegistry(Collections.emptyList())); return client; } private static class Actions extends HashMap { - private Actions(Settings settings, ThreadPool threadPool, ActionType[] actions) { - for (ActionType action : actions) { + private Actions(Settings settings, ThreadPool threadPool, ActionType[] actions) { + for (ActionType action : actions) { put(action, new InternalTransportAction(settings, action.name(), threadPool)); } } diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 53f5d70b2d5eb..7c9a816d38718 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -15,7 +15,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.mock; @@ -41,7 +40,6 @@ import org.junit.After; import org.junit.Before; import org.opensearch.Version; -import org.opensearch.action.ActionModule; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; @@ -96,7 +94,6 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { private FeatureFlagSetter featureFlagSetter; private TransportService transportService; - private ActionModule actionModule; private RestController restController; private SettingsModule settingsModule; private ClusterService clusterService; @@ -160,7 +157,6 @@ public void setup() throws Exception { null, Collections.emptySet() ); - actionModule = mock(ActionModule.class); restController = new RestController( emptySet(), null, @@ -168,7 +164,6 @@ public void setup() throws Exception { new NoneCircuitBreakerService(), new UsageService() ); - when(actionModule.getRestController()).thenReturn(restController); settingsModule = new SettingsModule(Settings.EMPTY, emptyList(), emptyList(), emptySet()); clusterService = createClusterService(threadPool); @@ -737,7 +732,7 @@ public void testRegisterHandler() throws Exception { ) ); extensionsManager.initializeServicesAndRestHandler( - actionModule, + restController, settingsModule, mockTransportService, clusterService, @@ -817,7 +812,7 @@ private void initialize(ExtensionsManager extensionsManager) { transportService.start(); transportService.acceptIncomingRequests(); extensionsManager.initializeServicesAndRestHandler( - actionModule, + restController, settingsModule, transportService, clusterService, diff --git a/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java b/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java index e819fcd893367..27f1597e5779f 100644 --- a/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java +++ b/server/src/test/java/org/opensearch/extensions/RegisterTransportActionsRequestTests.java @@ -11,7 +11,6 @@ import org.junit.Before; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.extensions.action.RegisterTransportActionsRequest; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 0f3aab3644489..2d0821a0fb7dd 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -11,9 +11,6 @@ import org.junit.After; import org.junit.Before; import org.opensearch.Version; -import org.opensearch.action.ActionModule; -import org.opensearch.action.ActionModule.DynamicActionRegistry; -import org.opensearch.action.support.ActionFilters; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -23,6 +20,7 @@ import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.extensions.DiscoveryExtensionNode; import org.opensearch.extensions.AcknowledgedResponse; +import org.opensearch.extensions.RegisterTransportActionsRequest; import org.opensearch.extensions.rest.RestSendToExtensionActionTests; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.test.OpenSearchTestCase; @@ -43,14 +41,10 @@ import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; public class ExtensionTransportActionsHandlerTests extends OpenSearchTestCase { - private static final ActionFilters EMPTY_FILTERS = new ActionFilters(Collections.emptySet()); private TransportService transportService; private MockNioTransport transport; private DiscoveryExtensionNode discoveryExtensionNode; @@ -96,17 +90,10 @@ public void setup() throws Exception { Collections.emptyList() ); client = new NoOpNodeClient(this.getTestName()); - ActionModule mockActionModule = mock(ActionModule.class); - DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); - dynamicActionRegistry.registerUnmodifiableActionMap(Collections.emptyMap()); - when(mockActionModule.getDynamicActionRegistry()).thenReturn(dynamicActionRegistry); - when(mockActionModule.getActionFilters()).thenReturn(EMPTY_FILTERS); extensionTransportActionsHandler = new ExtensionTransportActionsHandler( Map.of("uniqueid1", discoveryExtensionNode), transportService, - client, - mockActionModule, - null + client ); } @@ -121,14 +108,11 @@ public void tearDown() throws Exception { public void testRegisterAction() { String action = "test-action"; - extensionTransportActionsHandler.registerAction(action, discoveryExtensionNode.getId()); + extensionTransportActionsHandler.registerAction(action, discoveryExtensionNode); assertEquals(discoveryExtensionNode, extensionTransportActionsHandler.getExtension(action)); // Test duplicate action registration - expectThrows( - IllegalArgumentException.class, - () -> extensionTransportActionsHandler.registerAction(action, discoveryExtensionNode.getId()) - ); + expectThrows(IllegalArgumentException.class, () -> extensionTransportActionsHandler.registerAction(action, discoveryExtensionNode)); assertEquals(discoveryExtensionNode, extensionTransportActionsHandler.getExtension(action)); } @@ -146,14 +130,12 @@ public void testRegisterTransportActionsRequest() { assertFalse(response.getStatus()); } - public void testTransportActionRequestFromExtension() throws Exception { + public void testTransportActionRequestFromExtension() throws InterruptedException { String action = "test-action"; byte[] requestBytes = "requestBytes".getBytes(StandardCharsets.UTF_8); TransportActionRequestFromExtension request = new TransportActionRequestFromExtension(action, requestBytes, "uniqueid1"); - RemoteExtensionActionResponse response = extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request); - assertFalse(response.isSuccess()); - String responseString = response.getResponseBytesAsString(); - assertEquals("Request failed: action [test-action] is not registered for any extension.", responseString); + // NoOpNodeClient returns null as response + expectThrows(NullPointerException.class, () -> extensionTransportActionsHandler.handleTransportActionRequestFromExtension(request)); } public void testSendTransportRequestToExtension() throws InterruptedException { diff --git a/server/src/test/java/org/opensearch/extensions/action/RemoteExtensionActionResponseTests.java b/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java similarity index 55% rename from server/src/test/java/org/opensearch/extensions/action/RemoteExtensionActionResponseTests.java rename to server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java index 4ce42450bd577..070feaa240d98 100644 --- a/server/src/test/java/org/opensearch/extensions/action/RemoteExtensionActionResponseTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/TransportActionResponseToExtensionTests.java @@ -13,42 +13,31 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.nio.charset.StandardCharsets; -public class RemoteExtensionActionResponseTests extends OpenSearchTestCase { - - public void testExtensionActionResponse() throws Exception { +public class TransportActionResponseToExtensionTests extends OpenSearchTestCase { + public void testTransportActionRequestToExtension() throws IOException { byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); - RemoteExtensionActionResponse response = new RemoteExtensionActionResponse(true, expectedResponseBytes); + TransportActionResponseToExtension response = new TransportActionResponseToExtension(expectedResponseBytes); - assertTrue(response.isSuccess()); assertEquals(expectedResponseBytes, response.getResponseBytes()); BytesStreamOutput out = new BytesStreamOutput(); response.writeTo(out); BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes())); - response = new RemoteExtensionActionResponse(in); + response = new TransportActionResponseToExtension(in); - assertTrue(response.isSuccess()); assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); } - public void testSetters() { - String expectedResponse = "response-bytes"; - byte[] expectedResponseBytes = expectedResponse.getBytes(StandardCharsets.UTF_8); + public void testSetBytes() { + byte[] expectedResponseBytes = "response-bytes".getBytes(StandardCharsets.UTF_8); byte[] expectedEmptyBytes = new byte[0]; - RemoteExtensionActionResponse response = new RemoteExtensionActionResponse(false, expectedEmptyBytes); + TransportActionResponseToExtension response = new TransportActionResponseToExtension(expectedEmptyBytes); assertArrayEquals(expectedEmptyBytes, response.getResponseBytes()); - assertFalse(response.isSuccess()); - - response.setResponseBytesAsString(expectedResponse); - assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); response.setResponseBytes(expectedResponseBytes); assertArrayEquals(expectedResponseBytes, response.getResponseBytes()); - assertEquals(expectedResponse, response.getResponseBytesAsString()); - - response.setSuccess(true); - assertTrue(response.isSuccess()); } } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java index 6971aa866ccb1..cc1a9d4fd2e40 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java @@ -34,7 +34,6 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionType; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.admin.indices.validate.query.ValidateQueryAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.TransportAction; @@ -85,7 +84,7 @@ public class RestValidateQueryActionTests extends AbstractSearchTestCase { public static void stubValidateQueryAction() { final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); - final TransportAction transportAction = new TransportAction<>( + final TransportAction transportAction = new TransportAction( ValidateQueryAction.NAME, new ActionFilters(Collections.emptySet()), taskManager @@ -97,9 +96,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener listen final Map actions = new HashMap<>(); actions.put(ValidateQueryAction.INSTANCE, transportAction); - DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); - dynamicActionRegistry.registerUnmodifiableActionMap(actions); - client.initialize(dynamicActionRegistry, () -> "local", null, new NamedWriteableRegistry(Collections.emptyList())); + client.initialize(actions, () -> "local", null, new NamedWriteableRegistry(Collections.emptyList())); controller.registerHandler(action); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4b58ee6a0c4e7..b9087db60c271 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -37,7 +37,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionType; import org.opensearch.action.RequestValidators; import org.opensearch.action.StepListener; @@ -2192,10 +2191,8 @@ public void onFailure(final Exception e) { indexNameExpressionResolver ) ); - DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); - dynamicActionRegistry.registerUnmodifiableActionMap(actions); client.initialize( - dynamicActionRegistry, + actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), new NamedWriteableRegistry(Collections.emptyList()) diff --git a/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java b/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java index 1ff7a287b9b30..413c78ba37026 100644 --- a/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java +++ b/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java @@ -34,10 +34,10 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; +import org.opensearch.action.support.TransportAction; import org.opensearch.client.Client; import org.opensearch.client.node.NodeClient; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -48,6 +48,7 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterService; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -85,7 +86,7 @@ public void doE @Override public void initialize( - DynamicActionRegistry dynamicActionRegistry, + Map actions, Supplier localNodeId, RemoteClusterService remoteClusterService, NamedWriteableRegistry namedWriteableRegistry