diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 74be544123d9f..d7248a31b7b57 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -449,6 +449,7 @@ public class ActionModule extends AbstractModule { private final SettingsFilter settingsFilter; private final List actionPlugins; private final Map> actions; + private ActionRegistry actionRegistryMap; private final ActionFilters actionFilters; private final AutoCreateIndex autoCreateIndex; private final DestructiveOperations destructiveOperations; @@ -478,6 +479,7 @@ public ActionModule( this.actionPlugins = actionPlugins; this.threadPool = threadPool; actions = setupActions(actionPlugins); + actionRegistryMap = new ActionRegistry(actions); actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices); destructiveOperations = new DestructiveOperations(settings, clusterSettings); @@ -510,25 +512,31 @@ public ActionModule( return actions; } - static Map> setupActions(List actionPlugins) { - // Subclass NamedRegistry for easy registration - class ActionRegistry extends NamedRegistry> { - ActionRegistry() { - super("action"); - } + public static class ActionRegistry extends NamedRegistry> { + public ActionRegistry() { + super("action"); + } - public void register(ActionHandler handler) { - register(handler.getAction().name(), handler); - } + public ActionRegistry(Map> actions) { + super("action", actions); + } - public void register( - ActionType action, - Class> transportAction, - Class... supportTransportActions - ) { - register(new ActionHandler<>(action, transportAction, supportTransportActions)); - } + public void register(ActionHandler handler) { + register(handler.getAction().name(), handler); + } + + public void register( + ActionType action, + Class> transportAction, + Class... supportTransportActions + ) { + register(new ActionHandler<>(action, transportAction, supportTransportActions)); } + } + + static Map> setupActions(List actionPlugins) { + // Subclass NamedRegistry for easy registration + ActionRegistry actions = new ActionRegistry(); actions.register(MainAction.INSTANCE, TransportMainAction.class); @@ -892,6 +900,8 @@ protected void configure() { // Supporting classes bind(AutoCreateIndex.class).toInstance(autoCreateIndex); bind(TransportLivenessAction.class).asEagerSingleton(); + // binding new actions for NodeClient to consume + bind(ActionRegistry.class).toInstance(actionRegistryMap); // register ActionType -> transportAction Map used by NodeClient @SuppressWarnings("rawtypes") @@ -917,4 +927,12 @@ public ActionFilters getActionFilters() { public RestController getRestController() { return restController; } + + public void registerAction( + ActionType action, + Class> transportAction, + Class... supportTransportActions + ) { + actionRegistryMap.register(new ActionHandler<>(action, transportAction, supportTransportActions)); + } } diff --git a/server/src/main/java/org/opensearch/client/node/NodeClient.java b/server/src/main/java/org/opensearch/client/node/NodeClient.java index 56cb7c406744a..6142ac74177ad 100644 --- a/server/src/main/java/org/opensearch/client/node/NodeClient.java +++ b/server/src/main/java/org/opensearch/client/node/NodeClient.java @@ -32,6 +32,7 @@ package org.opensearch.client.node; +import org.opensearch.action.ActionModule; import org.opensearch.action.ActionType; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRequest; @@ -57,6 +58,7 @@ */ public class NodeClient extends AbstractClient { + private ActionModule.ActionRegistry actionsRegistry; private Map actions; /** * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by @@ -72,11 +74,13 @@ public NodeClient(Settings settings, ThreadPool threadPool) { public void initialize( Map actions, + ActionModule.ActionRegistry actionsRegistry, Supplier localNodeId, RemoteClusterService remoteClusterService, NamedWriteableRegistry namedWriteableRegistry ) { this.actions = actions; + this.actionsRegistry = actionsRegistry; this.localNodeId = localNodeId; this.remoteClusterService = remoteClusterService; this.namedWriteableRegistry = namedWriteableRegistry; @@ -137,6 +141,7 @@ public String getLocalNodeId() { private TransportAction transportAction( ActionType action ) { + /* TODO add support to read actionsRegistry along with actions */ if (actions == null) { throw new IllegalStateException("NodeClient has not been initialized"); } diff --git a/server/src/main/java/org/opensearch/common/NamedRegistry.java b/server/src/main/java/org/opensearch/common/NamedRegistry.java index a0e98d9126628..16785e2d97b27 100644 --- a/server/src/main/java/org/opensearch/common/NamedRegistry.java +++ b/server/src/main/java/org/opensearch/common/NamedRegistry.java @@ -45,11 +45,17 @@ * @opensearch.internal */ public class NamedRegistry { - private final Map registry = new HashMap<>(); + private final Map registry; private final String targetName; public NamedRegistry(String targetName) { this.targetName = targetName; + this.registry = new HashMap<>(); + } + + public NamedRegistry(String targetName, Map registry) { + this.targetName = targetName; + this.registry = registry; } public Map getRegistry() { diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionsAction.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionsAction.java new file mode 100644 index 0000000000000..19a70fa276237 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionsAction.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionType; + +public class ExtensionsAction extends ActionType { + public static final String NAME = "cluster:monitor/extension"; + public static final ExtensionsAction INSTANCE = new ExtensionsAction(); + + public ExtensionsAction() { + super(NAME, ExtensionsActionResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionsActionRequest.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionsActionRequest.java new file mode 100644 index 0000000000000..0830049c37e6e --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionsActionRequest.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class ExtensionsActionRequest extends ActionRequest { + String firstRequest; + + public ExtensionsActionRequest(String firstRequest) { + this.firstRequest = firstRequest; + } + + ExtensionsActionRequest(StreamInput in) throws IOException { + super(in); + firstRequest = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(firstRequest); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionsActionResponse.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionsActionResponse.java new file mode 100644 index 0000000000000..5cbf5ede32d66 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionsActionResponse.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public class ExtensionsActionResponse extends ActionResponse implements ToXContentObject { + String myFirstResponse; + + ExtensionsActionResponse(String myFirstResponse) { + this.myFirstResponse = myFirstResponse; + } + + ExtensionsActionResponse(StreamInput in) throws IOException { + super(in); + myFirstResponse = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(myFirstResponse); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/extensions/action/TransportExtensionsAction.java b/server/src/main/java/org/opensearch/extensions/action/TransportExtensionsAction.java new file mode 100644 index 0000000000000..4c07c9c455039 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/action/TransportExtensionsAction.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.Node; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +public class TransportExtensionsAction extends HandledTransportAction { + + private final String nodeName; + private final ClusterService clusterService; + + @Inject + public TransportExtensionsAction( + Settings settings, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService + ) { + super(ExtensionsAction.NAME, transportService, actionFilters, ExtensionsActionRequest::new); + this.nodeName = Node.NODE_NAME_SETTING.get(settings); + this.clusterService = clusterService; + } + + @Override + protected void doExecute(Task task, ExtensionsActionRequest request, ActionListener listener) { + listener.onResponse(new ExtensionsActionResponse("HelloWorld")); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ebfe0b8a1fcfa..e1f09f04e537a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,8 +36,13 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; +import org.opensearch.action.ActionType; +import org.opensearch.action.support.TransportAction; +import org.opensearch.common.inject.Key; import org.opensearch.common.util.FeatureFlags; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; +import org.opensearch.extensions.action.ExtensionsAction; +import org.opensearch.extensions.action.TransportExtensionsAction; import org.opensearch.index.IndexingPressureService; import org.opensearch.extensions.ExtensionsOrchestrator; //import org.opensearch.index.store.RemoteDirectoryFactory; @@ -54,12 +59,11 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.Version; import org.opensearch.action.ActionModule; -import org.opensearch.action.ActionType; +import org.opensearch.action.ActionModule.ActionRegistry; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchTransportService; -import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; import org.opensearch.bootstrap.BootstrapCheck; import org.opensearch.bootstrap.BootstrapContext; @@ -92,7 +96,6 @@ import org.opensearch.common.component.Lifecycle; import org.opensearch.common.component.LifecycleComponent; import org.opensearch.common.inject.Injector; -import org.opensearch.common.inject.Key; import org.opensearch.common.inject.Module; import org.opensearch.common.inject.ModulesBuilder; import org.opensearch.common.io.stream.NamedWriteableRegistry; @@ -919,6 +922,7 @@ protected Node( final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); modules.add(b -> { + b.bind(ActionModule.class).toInstance(actionModule); b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); @@ -1018,7 +1022,12 @@ protected Node( resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key>() { - }), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry); + }), + injector.getInstance(ActionRegistry.class), + () -> clusterService.localNode().getId(), + transportService.getRemoteClusterService(), + namedWriteableRegistry + ); this.namedWriteableRegistry = namedWriteableRegistry; logger.debug("initializing HTTP handlers ..."); @@ -1101,6 +1110,8 @@ public Node start() throws NodeValidationException { nodeService.getMonitorService().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); + final ActionModule actionModule = injector.getInstance(ActionModule.class); + actionModule.registerAction(ExtensionsAction.INSTANCE, TransportExtensionsAction.class); final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class); nodeConnectionsService.start(); diff --git a/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java b/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java index cb9e3a6a19388..0f04eb5f91d15 100644 --- a/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java +++ b/server/src/test/java/org/opensearch/client/node/NodeClientHeadersTests.java @@ -57,7 +57,7 @@ protected Client buildClient(Settings headersSettings, ActionType[] testedAction Settings settings = HEADER_SETTINGS; Actions actions = new Actions(settings, threadPool, testedActions); NodeClient client = new NodeClient(settings, threadPool); - client.initialize(actions, () -> "test", null, new NamedWriteableRegistry(Collections.emptyList())); + client.initialize(actions, null, () -> "test", null, new NamedWriteableRegistry(Collections.emptyList())); return client; } diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionsActionTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionsActionTests.java new file mode 100644 index 0000000000000..6e50c02e71fe4 --- /dev/null +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionsActionTests.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions.action; + +import org.opensearch.action.ActionFuture; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.concurrent.ExecutionException; + +public class ExtensionsActionTests extends OpenSearchSingleNodeTestCase { + + public void testExtensionAction() throws ExecutionException, InterruptedException { + ExtensionsActionRequest request = new ExtensionsActionRequest("MyFirstTransportRequest"); + ActionFuture execute = client().execute(ExtensionsAction.INSTANCE, request); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java index cc1a9d4fd2e40..64acc510a9b58 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/indices/RestValidateQueryActionTests.java @@ -96,7 +96,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener listen final Map actions = new HashMap<>(); actions.put(ValidateQueryAction.INSTANCE, transportAction); - client.initialize(actions, () -> "local", null, new NamedWriteableRegistry(Collections.emptyList())); + client.initialize(actions, null, () -> "local", null, new NamedWriteableRegistry(Collections.emptyList())); controller.registerHandler(action); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index c3ebbe40de8e2..df7126561fea8 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2148,6 +2148,7 @@ public void onFailure(final Exception e) { ); client.initialize( actions, + null, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), new NamedWriteableRegistry(Collections.emptyList()) diff --git a/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java b/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java index 413c78ba37026..b92a565fd3cfa 100644 --- a/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java +++ b/test/framework/src/main/java/org/opensearch/test/client/NoOpNodeClient.java @@ -34,6 +34,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionModule; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionType; @@ -87,6 +88,7 @@ public void doE @Override public void initialize( Map actions, + ActionModule.ActionRegistry actionRegistry, Supplier localNodeId, RemoteClusterService remoteClusterService, NamedWriteableRegistry namedWriteableRegistry