Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Extensions] Add DynamicActionRegistry to ActionModule #6734

Merged
merged 30 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7f58a03
Add dynamic action registry to ActionModule
dbwiddis Mar 16, 2023
49515ab
Update registration of transport actions
dbwiddis Mar 17, 2023
0fa1666
Generate transport actions dynamically
dbwiddis Mar 17, 2023
cf40500
Refactor to combine registry internals
dbwiddis Mar 17, 2023
6edcc0b
Finally figured out the generics (or lack thereof)
dbwiddis Mar 17, 2023
48b0997
ExtensionProxyAction is dead! Long live ExtensionAction!
dbwiddis Mar 17, 2023
350b847
Simplify ExtensionTransportActionHandler, fix compile issues
dbwiddis Mar 17, 2023
e5eb0d0
Maybe tests will pass with this commit
dbwiddis Mar 18, 2023
51ed329
I guess you can't use null as a key in a map
dbwiddis Mar 18, 2023
3b50c87
Lazy test setup, but this should finally work
dbwiddis Mar 18, 2023
6eec3ab
Add Tests
dbwiddis Mar 18, 2023
52e97b2
Fix TransportActionRequestFromExtension inheritance
dbwiddis Mar 18, 2023
c6921dd
Fix return type for transport actions from extensions
dbwiddis Mar 18, 2023
f434406
Fix ParametersInWrongOrderError and add some preemptive null handling
dbwiddis Mar 19, 2023
1cd8278
NPE is not expected result if params are in correct order
dbwiddis Mar 19, 2023
d9ca024
Remove redundant class and string parsing, add success boolean
dbwiddis Mar 19, 2023
27b6521
Last fix of params out of order. Working test case!
dbwiddis Mar 20, 2023
a979767
Code worked, tests didn't. This is finally done (I think)
dbwiddis Mar 20, 2023
b9afc50
Add more detail to comments on immutable vs. dynamic maps
dbwiddis Mar 20, 2023
46b5a83
Add StreamInput getter to ExtensionActionResponse
dbwiddis Mar 22, 2023
0834375
Generalize dynamic action registration
dbwiddis Mar 22, 2023
b2fe38c
Comment and naming fixes
dbwiddis Mar 22, 2023
032ee0d
Register method renaming
dbwiddis Mar 22, 2023
4a9690f
Add generic type parameters
dbwiddis Mar 22, 2023
c581e55
Improve/simplify which parameter types get passed
dbwiddis Mar 22, 2023
a6f72e5
Revert removal of ProxyAction and changes to transport and requests
dbwiddis Mar 22, 2023
cd1630a
Wrap ExtensionTransportResponse in a class denoting success
dbwiddis Mar 22, 2023
56e37e8
Remove generic types as they are incompatible with Guice injection
dbwiddis Mar 22, 2023
c425be4
Fix response handling, it works (again)
dbwiddis Mar 23, 2023
c9c753c
Fix up comments and remove debug logging
dbwiddis Mar 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 89 additions & 3 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.common.inject.multibindings.MapBinder;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionTransportAction;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionProxyTransportAction;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -448,13 +448,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;

/**
* Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}.
Expand All @@ -471,7 +473,17 @@ public class ActionModule extends AbstractModule {
private final ClusterSettings clusterSettings;
private final SettingsFilter settingsFilter;
private final List<ActionPlugin> actionPlugins;
// The unmodifiable map containing OpenSearch and Plugin actions
// This is initialized at node bootstrap and contains same-JVM actions
// It will be wrapped in the Dynamic Action Registry but otherwise
// remains unchanged from its prior purpose, and registered actions
// will remain accessible.
private final Map<String, ActionHandler<?, ?>> actions;
// A dynamic action registry which includes the above immutable actions
// and also registers dynamic actions which may be unregistered. Usually
// associated with remote action execution on extensions, possibly in
// a different JVM and possibly on a different server.
private final DynamicActionRegistry dynamicActionRegistry;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how you've separated out dynamic actions differently, what concerns do you see of modifying the existing actions ? I see we aren't using them anyway.
I can think of inconsistencies with other nodes which might not have registered actions after bootstrap.

I understand the safeguards make sense for 2.x but for 3.x Im leaning towards having one set of actions which are injected and read by the client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saratvemulapalli "injected" only happens once, and the whole ActionType instance -> TransportAction class setup is tightly integrated. The injector already has the necessary instances. Removing them from the map wouldn't take away the singleton that is still bound to that class. In order to truly "unregister" the things loaded in during bootstrap we need to get away from injection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK..... some follow-up comments:

  1. Conceptually this just boils down to an immutable map and a mutable one, plus some methods to access it
  2. I will do renaming to make it more generic describing the above
  3. There is some point that client.execute() on the NodeClient needs access to both maps. We have the choice of adding a separate map and passing both to NodeClient (my original PR before refactoring in response to @peternied comments) or combining them here inside this module. Or combining them somewhere else in a whole new module. I think here is the simplest approach, particularly since they are all "actions".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense thanks @dbwiddis !

private final ActionFilters actionFilters;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
Expand Down Expand Up @@ -502,6 +514,7 @@ public ActionModule(
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
dynamicActionRegistry = new DynamicActionRegistry();
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
Expand Down Expand Up @@ -711,7 +724,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
// ExtensionProxyAction
actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class);
actions.register(ExtensionProxyAction.INSTANCE, ExtensionProxyTransportAction.class);
}

// Decommission actions
Expand Down Expand Up @@ -954,13 +967,86 @@ protected void configure() {
bind(supportAction).asEagerSingleton();
}
}

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);
}

public ActionFilters getActionFilters() {
return actionFilters;
}

public DynamicActionRegistry getDynamicActionRegistry() {
return dynamicActionRegistry;
}

public RestController getRestController() {
return restController;
}

/**
* The DynamicActionRegistry maintains a registry mapping {@link ActionType} instances to {@link TransportAction} instances.
* <p>
* This class is modeled after {@link NamedRegistry} but provides both register and unregister capabilities.
*
* @opensearch.internal
*/
public static class DynamicActionRegistry {
// This is the unmodifiable actions map created during node bootstrap, which
// will continue to link ActionType and TransportAction pairs from core and plugin
// action handler registration.
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use raw type for type with generic type arguments:

Suggested change
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
private Map<ActionType<?>, TransportAction<?, ?>> actions = Collections.emptyMap();

Same applies for registry

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change for registry but the raw types are required for the actions map.

The existing code sends to NodeClient with this line:

client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {

and the NodeClient version of the map (which is now being wrapped inside the registry) is:

private Map<ActionType, TransportAction> actions;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I guess I can add the <?> to the existing code....

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be better, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to not work:

»  org.opensearch.common.inject.ConfigurationException: Guice configuration errors:
»  
»  1) No implementation for java.util.Map<org.opensearch.action.ActionType<?>, org.opensearch.action.support.TransportAction<?, ?>> was bound.
»    while locating java.util.Map<org.opensearch.action.ActionType<?>, org.opensearch.action.support.TransportAction<?, ?>>

I think with type erasure we have to use raw types with the injection.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:o this is very surprising, will take a look shorlty

// A dynamic registry to add or remove ActionType / TransportAction pairs
// at times other than node bootstrap.
private final Map<ActionType<?>, TransportAction<?, ?>> registry = new ConcurrentHashMap<>();

/**
* Register the immutable actions in the registry.
*
* @param actions The injected map of {@link ActionType} to {@link TransportAction}
*/
public void registerUnmodifiableActionMap(Map<ActionType, TransportAction> actions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand, we could've directly passed in the map L516 when DynamicActionRegistry is created. Did you want to keep the structure of the injected information ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand, we could've directly passed in the map L516 when DynamicActionRegistry is created.

Yep but it's a different map.

Given getActions() this was the simplest approach and the first thing I tried.

The bits we need are the ActionType key and TransportAction singleton instance value.

The signature of actions is Map<String, ActionHandler<?, ?>> where the ActionHandler does have the ActionType key, but only the class of the TransportAction. And the only way to fetch that singleton instance that was injected is to pass around the injector (or a map retrieved from the injector, as was done to send the "map we need" to NodeClient and is now inside the DynamicActionRegistry).

Did you want to keep the structure of the injected information ?

My original implementation kept this map injected as-is and handled the two different maps in the NodeClient. However, based on initial review it was requested I keep those details out of the client.

But we still need to provide an ActionType instance key (easy) and fetch a singleton TransportAction value that we can run the doExecute() method on.

We could use the class and get the right instance if we keep a copy of the injector inside the registry, and in theory that might help out with the concern over rawtypes.

But that's also new code/patterns and I really tried to keep this as close to the way it already was as I could.

this.actions = actions;
}

/**
* Add a dynamic action to the registry.
*
* @param action The action instance to add
* @param transportAction The corresponding instance of transportAction to execute
*/
public void registerDynamicAction(ActionType<?> action, TransportAction<?, ?> transportAction) {
requireNonNull(action, "action is required");
requireNonNull(transportAction, "transportAction is required");
if (actions.containsKey(action) || registry.putIfAbsent(action, transportAction) != null) {
throw new IllegalArgumentException("action [" + action.name() + "] already registered");
}
}

/**
* Remove a dynamic action from the registry.
*
* @param action The action to remove
*/
public void unregisterDynamicAction(ActionType<?> action) {
requireNonNull(action, "action is required");
if (registry.remove(action) == null) {
throw new IllegalArgumentException("action [" + action.name() + "] was not registered");
}
}

/**
* Gets the {@link TransportAction} instance corresponding to the {@link ActionType} instance.
*
* @param action The {@link ActionType}.
* @return the corresponding {@link TransportAction} if it is registered, null otherwise.
*/
@SuppressWarnings("unchecked")
public TransportAction<? extends ActionRequest, ? extends ActionResponse> get(ActionType<?> action) {
if (actions.containsKey(action)) {
return actions.get(action);
}
return registry.get(action);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.ActionType;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
Expand All @@ -47,7 +48,6 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterService;

import java.util.Map;
import java.util.function.Supplier;

/**
Expand All @@ -57,7 +57,7 @@
*/
public class NodeClient extends AbstractClient {

private Map<ActionType, TransportAction> actions;
private DynamicActionRegistry actionRegistry;
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(ActionType, ActionRequest, TaskListener)}.
Expand All @@ -71,12 +71,12 @@ public NodeClient(Settings settings, ThreadPool threadPool) {
}

public void initialize(
Map<ActionType, TransportAction> actions,
DynamicActionRegistry actionRegistry,
Supplier<String> localNodeId,
RemoteClusterService remoteClusterService,
NamedWriteableRegistry namedWriteableRegistry
) {
this.actions = actions;
this.actionRegistry = actionRegistry;
this.localNodeId = localNodeId;
this.remoteClusterService = remoteClusterService;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -137,10 +137,10 @@ public String getLocalNodeId() {
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
if (actions == null) {
if (actionRegistry == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
TransportAction<Request, Response> transportAction = actions.get(action);
TransportAction<Request, Response> transportAction = (TransportAction<Request, Response>) actionRegistry.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
Expand All @@ -47,6 +48,8 @@
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.ExtensionTransportActionsHandler;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestActionsRequestHandler;
Expand All @@ -58,7 +61,6 @@
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.rest.RestController;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class ExtensionsManager {
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction";
public static final String REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION = "internal:extensions/handle-remote-transportaction";
public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension";
public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10;

Expand Down Expand Up @@ -166,22 +169,22 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept
* Initializes the {@link RestActionsRequestHandler}, {@link TransportService}, {@link ClusterService} and environment settings. This is called during Node bootstrap.
* Lists/maps of extensions have already been initialized but not yet populated.
*
* @param restController The RestController on which to register Rest Actions.
* @param actionModule The ActionModule with the RestController and DynamicActionModule
* @param settingsModule The module that binds the provided settings to interface.
* @param transportService The Node's transport service.
* @param clusterService The Node's cluster service.
* @param initialEnvironmentSettings The finalized view of settings for the Environment
* @param client The client used to make transport requests
*/
public void initializeServicesAndRestHandler(
RestController restController,
ActionModule actionModule,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings,
NodeClient client
) {
this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService);
this.restActionsRequestHandler = new RestActionsRequestHandler(actionModule.getRestController(), extensionIdMap, transportService);
this.customSettingsRequestHandler = new CustomSettingsRequestHandler(settingsModule);
this.transportService = transportService;
this.clusterService = clusterService;
Expand All @@ -192,7 +195,13 @@ public void initializeServicesAndRestHandler(
REQUEST_EXTENSION_UPDATE_SETTINGS
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client);
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
extensionIdMap,
transportService,
client,
actionModule,
this
);
registerRequestHandler();
}

Expand All @@ -201,6 +210,15 @@ public void initializeServicesAndRestHandler(
*
* @param request which was sent by an extension.
*/
public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception {
return extensionTransportActionsHandler.sendRemoteTransportRequestToExtension(request);
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by OpenSearch or a plugin
*
* @param request which was sent by an extension.
*/
public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception {
return extensionTransportActionsHandler.sendTransportRequestToExtension(request);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionType;

import java.util.Objects;

/**
* An {@link ActionType} to be used in extension action transport handling.
*
* @opensearch.internal
*/
public class ExtensionAction extends ActionType<RemoteExtensionActionResponse> {

private final String uniqueId;

/**
* Create an instance of this action to register in the dynamic actions map.
*
* @param uniqueId The uniqueId of the extension which will run this action.
* @param name The fully qualified class name of the extension's action to execute.
*/
public ExtensionAction(String uniqueId, String name) {
super(name, RemoteExtensionActionResponse::new);
this.uniqueId = uniqueId;
}

/**
* Gets the uniqueId of the extension which will run this action.
*
* @return the uniqueId
*/
public String uniqueId() {
return this.uniqueId;
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + Objects.hash(uniqueId);
return result;
}
Comment on lines +44 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks interesting, curious who uses hashCode

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the key to the map with the TransportAction value. Having matching hashcodes means I can (and do in this code) re-create the instance to be used as a key using just the strings.


@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!super.equals(obj)) return false;
if (getClass() != obj.getClass()) return false;
ExtensionAction other = (ExtensionAction) obj;
return Objects.equals(uniqueId, other.uniqueId);
}
}
Loading