Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/extensions] First draft of adding support for dynamically registering actions #4460

50 changes: 34 additions & 16 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ public class ActionModule extends AbstractModule {
private final SettingsFilter settingsFilter;
private final List<ActionPlugin> actionPlugins;
private final Map<String, ActionHandler<?, ?>> actions;
private ActionRegistry actionRegistryMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything around this is final. Should this also be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point. I was debating but it can be a final because the instance would not change.
Will update.

private final ActionFilters actionFilters;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
Expand Down Expand Up @@ -478,6 +479,7 @@ public ActionModule(
this.actionPlugins = actionPlugins;
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionRegistryMap = new ActionRegistry(actions);
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Expand Down Expand Up @@ -510,25 +512,31 @@ public ActionModule(
return actions;
}

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super("action");
}
public static class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
public ActionRegistry() {
super("action");
}

public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}
public ActionRegistry(Map<String, ActionHandler<?, ?>> actions) {
super("action", actions);
}

public <Request extends ActionRequest, Response extends ActionResponse> void register(
ActionType<Response> action,
Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions
) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}

public <Request extends ActionRequest, Response extends ActionResponse> void register(
ActionType<Response> action,
Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions
) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
// Subclass NamedRegistry for easy registration

ActionRegistry actions = new ActionRegistry();

actions.register(MainAction.INSTANCE, TransportMainAction.class);
Expand Down Expand Up @@ -892,6 +900,8 @@ protected void configure() {
// Supporting classes
bind(AutoCreateIndex.class).toInstance(autoCreateIndex);
bind(TransportLivenessAction.class).asEagerSingleton();
// binding new actions for NodeClient to consume
bind(ActionRegistry.class).toInstance(actionRegistryMap);

// register ActionType -> transportAction Map used by NodeClient
@SuppressWarnings("rawtypes")
Expand All @@ -917,4 +927,12 @@ public ActionFilters getActionFilters() {
public RestController getRestController() {
return restController;
}

public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(
ActionType<Response> action,
Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions
) {
actionRegistryMap.register(new ActionHandler<>(action, transportAction, supportTransportActions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.client.node;

import org.opensearch.action.ActionModule;
import org.opensearch.action.ActionType;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
Expand All @@ -57,6 +58,7 @@
*/
public class NodeClient extends AbstractClient {

private ActionModule.ActionRegistry actionsRegistry;
private Map<ActionType, TransportAction> actions;
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
Expand All @@ -72,11 +74,13 @@ public NodeClient(Settings settings, ThreadPool threadPool) {

public void initialize(
Map<ActionType, TransportAction> actions,
ActionModule.ActionRegistry actionsRegistry,
Supplier<String> localNodeId,
RemoteClusterService remoteClusterService,
NamedWriteableRegistry namedWriteableRegistry
) {
this.actions = actions;
this.actionsRegistry = actionsRegistry;
this.localNodeId = localNodeId;
this.remoteClusterService = remoteClusterService;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -137,6 +141,7 @@ public String getLocalNodeId() {
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
/* TODO add support to read actionsRegistry along with actions */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an issue to track this TODO?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha I've taken care of this as part of SDK#106.

if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@
* @opensearch.internal
*/
public class NamedRegistry<T> {
private final Map<String, T> registry = new HashMap<>();
private final Map<String, T> registry;
private final String targetName;

public NamedRegistry(String targetName) {
this.targetName = targetName;
this.registry = new HashMap<>();
}

public NamedRegistry(String targetName, Map<String, T> registry) {
this.targetName = targetName;
this.registry = registry;
}

public Map<String, T> getRegistry() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionType;

public class ExtensionsAction extends ActionType<ExtensionsActionResponse> {
public static final String NAME = "cluster:monitor/extension";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be keep the action name in the same format as internal...?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure makes sense.

public static final ExtensionsAction INSTANCE = new ExtensionsAction();

public ExtensionsAction() {
super(NAME, ExtensionsActionResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class ExtensionsActionRequest extends ActionRequest {
String firstRequest;
Copy link
Member

@owaiskazi19 owaiskazi19 Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from draft PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is this a draft PR?

Why is the first request so important to preserve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, looks like I missed a commit during the rebase.
This is moved to action which the extension would like to register.


public ExtensionsActionRequest(String firstRequest) {
this.firstRequest = firstRequest;
}

ExtensionsActionRequest(StreamInput in) throws IOException {
super(in);
firstRequest = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(firstRequest);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;

import java.io.IOException;

public class ExtensionsActionResponse extends ActionResponse implements ToXContentObject {
String myFirstResponse;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like leftover from draft PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #4519 I merged multiple similar "string response" classes into an ExtensionStringResponse class. This looks very similar in that it's just a basic "string response" but extending ActionResponse rather than TransportResponse.

I don't think the name ExtensionsActionResponse is meaningful for the simplicity of this object.

I am also thinking we need to figure out a consistent naming convention.

I am also seeing that we are very inconsistent in pluralization of Extension in these class names.

Naming things is hard.


ExtensionsActionResponse(String myFirstResponse) {
this.myFirstResponse = myFirstResponse;
}

ExtensionsActionResponse(StreamInput in) throws IOException {
super(in);
myFirstResponse = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(myFirstResponse);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't dug into the implementation details, but when I see a super(in) I expect to see a matching super(out). I know the super is required for transport requests but not responses, and in the latter case the super call is a noop.

All that to say, either add a super(out) here or delete the super(in) above.

}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will eventually go here? I don't like seeing null returns, but a comment suggesting future usage would make me happier.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.Node;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportExtensionsAction extends HandledTransportAction<ExtensionsActionRequest, ExtensionsActionResponse> {

private final String nodeName;
private final ClusterService clusterService;

@Inject
public TransportExtensionsAction(
Settings settings,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService
) {
super(ExtensionsAction.NAME, transportService, actionFilters, ExtensionsActionRequest::new);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ExtensionsActionRequest request, ActionListener<ExtensionsActionResponse> listener) {
listener.onResponse(new ExtensionsActionResponse("HelloWorld"));
}
}
19 changes: 15 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.SetOnce;
import org.opensearch.action.ActionType;
import org.opensearch.action.support.TransportAction;
import org.opensearch.common.inject.Key;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.extensions.action.ExtensionsAction;
import org.opensearch.extensions.action.TransportExtensionsAction;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.extensions.ExtensionsOrchestrator;
//import org.opensearch.index.store.RemoteDirectoryFactory;
Expand All @@ -54,12 +59,11 @@
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.ActionType;
import org.opensearch.action.ActionModule.ActionRegistry;
import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.update.UpdateHelper;
import org.opensearch.bootstrap.BootstrapCheck;
import org.opensearch.bootstrap.BootstrapContext;
Expand Down Expand Up @@ -92,7 +96,6 @@
import org.opensearch.common.component.Lifecycle;
import org.opensearch.common.component.LifecycleComponent;
import org.opensearch.common.inject.Injector;
import org.opensearch.common.inject.Key;
import org.opensearch.common.inject.Module;
import org.opensearch.common.inject.ModulesBuilder;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -919,6 +922,7 @@ protected Node(
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);

modules.add(b -> {
b.bind(ActionModule.class).toInstance(actionModule);
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
Expand Down Expand Up @@ -1018,7 +1022,12 @@ protected Node(
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {
}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService(), namedWriteableRegistry);
}),
injector.getInstance(ActionRegistry.class),
() -> clusterService.localNode().getId(),
transportService.getRemoteClusterService(),
namedWriteableRegistry
);
this.namedWriteableRegistry = namedWriteableRegistry;

logger.debug("initializing HTTP handlers ...");
Expand Down Expand Up @@ -1101,6 +1110,8 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);
final ActionModule actionModule = injector.getInstance(ActionModule.class);
actionModule.registerAction(ExtensionsAction.INSTANCE, TransportExtensionsAction.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have tests for registerActions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point.
Sure.


final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected Client buildClient(Settings headersSettings, ActionType[] testedAction
Settings settings = HEADER_SETTINGS;
Actions actions = new Actions(settings, threadPool, testedActions);
NodeClient client = new NodeClient(settings, threadPool);
client.initialize(actions, () -> "test", null, new NamedWriteableRegistry(Collections.emptyList()));
client.initialize(actions, null, () -> "test", null, new NamedWriteableRegistry(Collections.emptyList()));
return client;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionFuture;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.util.concurrent.ExecutionException;

public class ExtensionsActionTests extends OpenSearchSingleNodeTestCase {

public void testExtensionAction() throws ExecutionException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usually tests contain assertions of some sort. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I just realized I missed a commit :D

ExtensionsActionRequest request = new ExtensionsActionRequest("MyFirstTransportRequest");
ActionFuture<ExtensionsActionResponse> execute = client().execute(ExtensionsAction.INSTANCE, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener listen
final Map<ActionType, TransportAction> actions = new HashMap<>();
actions.put(ValidateQueryAction.INSTANCE, transportAction);

client.initialize(actions, () -> "local", null, new NamedWriteableRegistry(Collections.emptyList()));
client.initialize(actions, null, () -> "local", null, new NamedWriteableRegistry(Collections.emptyList()));
Copy link
Member

@owaiskazi19 owaiskazi19 Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a dummy obj of actionsRegistry instead of passing null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

controller.registerHandler(action);
}

Expand Down
Loading