Skip to content

Commit

Permalink
Adding support for dynamically registering Actions
Browse files Browse the repository at this point in the history
Signed-off-by: Sarat Vemulapalli <[email protected]>
  • Loading branch information
saratvemulapalli committed Sep 12, 2022
1 parent 8283ede commit f2ceada
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 28 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 38 additions & 19 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;

/**
* Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}.
*
Expand All @@ -449,6 +447,7 @@ public class ActionModule extends AbstractModule {
private final SettingsFilter settingsFilter;
private final List<ActionPlugin> actionPlugins;
private final Map<String, ActionHandler<?, ?>> actions;
private ActionRegistry actionRegistryMap;
private final ActionFilters actionFilters;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
Expand Down Expand Up @@ -478,6 +477,7 @@ public ActionModule(
this.actionPlugins = actionPlugins;
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionRegistryMap = new ActionRegistry(actions);
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Expand Down Expand Up @@ -510,25 +510,31 @@ public ActionModule(
return actions;
}

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super("action");
}
public static class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super("action");
}

public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}
ActionRegistry(Map<String, ActionHandler<?, ?>> actions) {
super("action", actions);
}

public <Request extends ActionRequest, Response extends ActionResponse> void register(
ActionType<Response> action,
Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions
) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}

public <Request extends ActionRequest, Response extends ActionResponse> void register(
ActionType<Response> action,
Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions
) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration

ActionRegistry actions = new ActionRegistry();

actions.register(MainAction.INSTANCE, TransportMainAction.class);
Expand Down Expand Up @@ -683,7 +689,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

return unmodifiableMap(actions.getRegistry());
// Extensions
// actions.register(ExtensionsAction.INSTANCE, TransportExtensionsAction.class);

return actions.getRegistry();
}

private ActionFilters setupActionFilters(List<ActionPlugin> actionPlugins) {
Expand Down Expand Up @@ -892,6 +901,8 @@ protected void configure() {
// Supporting classes
bind(AutoCreateIndex.class).toInstance(autoCreateIndex);
bind(TransportLivenessAction.class).asEagerSingleton();
// binding new actions for NodeClient to consume
bind(ActionRegistry.class).toInstance(actionRegistryMap);

// register ActionType -> transportAction Map used by NodeClient
@SuppressWarnings("rawtypes")
Expand All @@ -917,4 +928,12 @@ public ActionFilters getActionFilters() {
public RestController getRestController() {
return restController;
}

public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(
ActionType<Response> action,
Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions
) {
actionRegistryMap.register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.client.node;

import org.opensearch.action.ActionModule;
import org.opensearch.action.ActionType;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
Expand All @@ -57,6 +58,7 @@
*/
public class NodeClient extends AbstractClient {

private ActionModule.ActionRegistry actionsRegistry;
private Map<ActionType, TransportAction> actions;
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
Expand All @@ -71,12 +73,12 @@ public NodeClient(Settings settings, ThreadPool threadPool) {
}

public void initialize(
Map<ActionType, TransportAction> actions,
ActionModule.ActionRegistry actionsRegistry,
Supplier<String> localNodeId,
RemoteClusterService remoteClusterService,
NamedWriteableRegistry namedWriteableRegistry
) {
this.actions = actions;
this.actionsRegistry = actionsRegistry;
this.localNodeId = localNodeId;
this.remoteClusterService = remoteClusterService;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -137,6 +139,7 @@ public String getLocalNodeId() {
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
/* TODO add support to read actionsRegistry along with actions */
if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@
* @opensearch.internal
*/
public class NamedRegistry<T> {
private final Map<String, T> registry = new HashMap<>();
private final Map<String, T> registry;
private final String targetName;

public NamedRegistry(String targetName) {
this.targetName = targetName;
this.registry = new HashMap<>();
}

public NamedRegistry(String targetName, Map<String, T> registry) {
this.targetName = targetName;
this.registry = registry;
}

public Map<String, T> getRegistry() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionType;

public class ExtensionsAction extends ActionType<ExtensionsActionResponse> {
public static final String NAME = "cluster:monitor/extension";
public static final ExtensionsAction INSTANCE = new ExtensionsAction();

public ExtensionsAction() {
super(NAME, ExtensionsActionResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class ExtensionsActionRequest extends ActionRequest {
String firstRequest;

public ExtensionsActionRequest(String firstRequest) {
this.firstRequest = firstRequest;
}

ExtensionsActionRequest(StreamInput in) throws IOException {
super(in);
firstRequest = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(firstRequest);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;

public class ExtensionsActionResponse extends ActionResponse implements ToXContentObject {
String myFirstResponse;

ExtensionsActionResponse(String myFirstResponse) {
this.myFirstResponse = myFirstResponse;
}

ExtensionsActionResponse(StreamInput in) throws IOException {
super(in);
myFirstResponse = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(myFirstResponse);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.Node;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportExtensionsAction extends HandledTransportAction<ExtensionsActionRequest, ExtensionsActionResponse> {

private final String nodeName;
private final ClusterService clusterService;

@Inject
public TransportExtensionsAction(
Settings settings,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService
) {
super(ExtensionsAction.NAME, transportService, actionFilters, ExtensionsActionRequest::new);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ExtensionsActionRequest request, ActionListener<ExtensionsActionResponse> listener) {
listener.onResponse(new ExtensionsActionResponse("HelloWorld"));
}
}
17 changes: 12 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.lucene.util.SetOnce;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.extensions.action.ExtensionsAction;
import org.opensearch.extensions.action.TransportExtensionsAction;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.extensions.ExtensionsOrchestrator;
//import org.opensearch.index.store.RemoteDirectoryFactory;
Expand All @@ -54,12 +56,11 @@
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.ActionType;
import org.opensearch.action.ActionModule.ActionRegistry;
import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.update.UpdateHelper;
import org.opensearch.bootstrap.BootstrapCheck;
import org.opensearch.bootstrap.BootstrapContext;
Expand Down Expand Up @@ -92,7 +93,6 @@
import org.opensearch.common.component.Lifecycle;
import org.opensearch.common.component.LifecycleComponent;
import org.opensearch.common.inject.Injector;
import org.opensearch.common.inject.Key;
import org.opensearch.common.inject.Module;
import org.opensearch.common.inject.ModulesBuilder;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -919,6 +919,7 @@ protected Node(
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);

modules.add(b -> {
b.bind(ActionModule.class).toInstance(actionModule);
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
Expand Down Expand Up @@ -1017,8 +1018,12 @@ protected Node(
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry);
client.initialize(
injector.getInstance(ActionRegistry.class),
() -> clusterService.localNode().getId(),
transportService.getRemoteClusterService(),
namedWriteableRegistry
);
this.namedWriteableRegistry = namedWriteableRegistry;

logger.debug("initializing HTTP handlers ...");
Expand Down Expand Up @@ -1101,6 +1106,8 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);
final ActionModule actionModule = injector.getInstance(ActionModule.class);
actionModule.registerAction(ExtensionsAction.INSTANCE, TransportExtensionsAction.class);

final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
Expand Down
Loading

0 comments on commit f2ceada

Please sign in to comment.