Skip to content

Commit

Permalink
[Extensions] Add DynamicActionRegistry to ActionModule (#6734) (#6829)
Browse files Browse the repository at this point in the history
* Add dynamic action registry to ActionModule



* Update registration of transport actions



* Generate transport actions dynamically



* Refactor to combine registry internals



* Finally figured out the generics (or lack thereof)



* ExtensionProxyAction is dead! Long live ExtensionAction!



* Simplify ExtensionTransportActionHandler, fix compile issues



* Maybe tests will pass with this commit



* I guess you can't use null as a key in a map



* Lazy test setup, but this should finally work



* Add Tests



* Fix TransportActionRequestFromExtension inheritance



* Fix return type for transport actions from extensions



* Fix ParametersInWrongOrderError and add some preemptive null handling



* NPE is not expected result if params are in correct order



* Remove redundant class and string parsing, add success boolean



* Last fix of params out of order. Working test case!



* Code worked, tests didn't. This is finally done (I think)



* Add more detail to comments on immutable vs. dynamic maps



* Add StreamInput getter to ExtensionActionResponse



* Generalize dynamic action registration



* Comment and naming fixes



* Register method renaming



* Add generic type parameters



* Improve/simplify which parameter types get passed



* Revert removal of ProxyAction and changes to transport and requests



* Wrap ExtensionTransportResponse in a class denoting success



* Remove generic types as they are incompatible with Guice injection



* Fix response handling, it works (again)



* Fix up comments and remove debug logging



---------


(cherry picked from commit 9febe10)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 2bb68aa commit ac79a63
Show file tree
Hide file tree
Showing 21 changed files with 645 additions and 158 deletions.
92 changes: 89 additions & 3 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.common.inject.multibindings.MapBinder;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionTransportAction;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionProxyTransportAction;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -448,13 +448,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;

/**
* Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}.
Expand All @@ -471,7 +473,17 @@ public class ActionModule extends AbstractModule {
private final ClusterSettings clusterSettings;
private final SettingsFilter settingsFilter;
private final List<ActionPlugin> actionPlugins;
// The unmodifiable map containing OpenSearch and Plugin actions
// This is initialized at node bootstrap and contains same-JVM actions
// It will be wrapped in the Dynamic Action Registry but otherwise
// remains unchanged from its prior purpose, and registered actions
// will remain accessible.
private final Map<String, ActionHandler<?, ?>> actions;
// A dynamic action registry which includes the above immutable actions
// and also registers dynamic actions which may be unregistered. Usually
// associated with remote action execution on extensions, possibly in
// a different JVM and possibly on a different server.
private final DynamicActionRegistry dynamicActionRegistry;
private final ActionFilters actionFilters;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
Expand Down Expand Up @@ -502,6 +514,7 @@ public ActionModule(
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
dynamicActionRegistry = new DynamicActionRegistry();
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
Expand Down Expand Up @@ -711,7 +724,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
// ExtensionProxyAction
actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class);
actions.register(ExtensionProxyAction.INSTANCE, ExtensionProxyTransportAction.class);
}

// Decommission actions
Expand Down Expand Up @@ -953,13 +966,86 @@ protected void configure() {
bind(supportAction).asEagerSingleton();
}
}

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);
}

public ActionFilters getActionFilters() {
return actionFilters;
}

public DynamicActionRegistry getDynamicActionRegistry() {
return dynamicActionRegistry;
}

public RestController getRestController() {
return restController;
}

/**
* The DynamicActionRegistry maintains a registry mapping {@link ActionType} instances to {@link TransportAction} instances.
* <p>
* This class is modeled after {@link NamedRegistry} but provides both register and unregister capabilities.
*
* @opensearch.internal
*/
public static class DynamicActionRegistry {
// This is the unmodifiable actions map created during node bootstrap, which
// will continue to link ActionType and TransportAction pairs from core and plugin
// action handler registration.
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
// A dynamic registry to add or remove ActionType / TransportAction pairs
// at times other than node bootstrap.
private final Map<ActionType<?>, TransportAction<?, ?>> registry = new ConcurrentHashMap<>();

/**
* Register the immutable actions in the registry.
*
* @param actions The injected map of {@link ActionType} to {@link TransportAction}
*/
public void registerUnmodifiableActionMap(Map<ActionType, TransportAction> actions) {
this.actions = actions;
}

/**
* Add a dynamic action to the registry.
*
* @param action The action instance to add
* @param transportAction The corresponding instance of transportAction to execute
*/
public void registerDynamicAction(ActionType<?> action, TransportAction<?, ?> transportAction) {
requireNonNull(action, "action is required");
requireNonNull(transportAction, "transportAction is required");
if (actions.containsKey(action) || registry.putIfAbsent(action, transportAction) != null) {
throw new IllegalArgumentException("action [" + action.name() + "] already registered");
}
}

/**
* Remove a dynamic action from the registry.
*
* @param action The action to remove
*/
public void unregisterDynamicAction(ActionType<?> action) {
requireNonNull(action, "action is required");
if (registry.remove(action) == null) {
throw new IllegalArgumentException("action [" + action.name() + "] was not registered");
}
}

/**
* Gets the {@link TransportAction} instance corresponding to the {@link ActionType} instance.
*
* @param action The {@link ActionType}.
* @return the corresponding {@link TransportAction} if it is registered, null otherwise.
*/
@SuppressWarnings("unchecked")
public TransportAction<? extends ActionRequest, ? extends ActionResponse> get(ActionType<?> action) {
if (actions.containsKey(action)) {
return actions.get(action);
}
return registry.get(action);
}
}
}
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/client/node/NodeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.action.ActionType;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
Expand All @@ -47,7 +48,6 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterService;

import java.util.Map;
import java.util.function.Supplier;

/**
Expand All @@ -57,7 +57,7 @@
*/
public class NodeClient extends AbstractClient {

private Map<ActionType, TransportAction> actions;
private DynamicActionRegistry actionRegistry;
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(ActionType, ActionRequest, TaskListener)}.
Expand All @@ -71,12 +71,12 @@ public NodeClient(Settings settings, ThreadPool threadPool) {
}

public void initialize(
Map<ActionType, TransportAction> actions,
DynamicActionRegistry actionRegistry,
Supplier<String> localNodeId,
RemoteClusterService remoteClusterService,
NamedWriteableRegistry namedWriteableRegistry
) {
this.actions = actions;
this.actionRegistry = actionRegistry;
this.localNodeId = localNodeId;
this.remoteClusterService = remoteClusterService;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -137,10 +137,10 @@ public String getLocalNodeId() {
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
if (actions == null) {
if (actionRegistry == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
TransportAction<Request, Response> transportAction = actions.get(action);
TransportAction<Request, Response> transportAction = (TransportAction<Request, Response>) actionRegistry.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
Expand All @@ -47,6 +48,8 @@
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.ExtensionTransportActionsHandler;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestActionsRequestHandler;
Expand All @@ -58,7 +61,6 @@
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.rest.RestController;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class ExtensionsManager {
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction";
public static final String REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION = "internal:extensions/handle-remote-transportaction";
public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension";
public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10;

Expand Down Expand Up @@ -166,22 +169,22 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept
* Initializes the {@link RestActionsRequestHandler}, {@link TransportService}, {@link ClusterService} and environment settings. This is called during Node bootstrap.
* Lists/maps of extensions have already been initialized but not yet populated.
*
* @param restController The RestController on which to register Rest Actions.
* @param actionModule The ActionModule with the RestController and DynamicActionModule
* @param settingsModule The module that binds the provided settings to interface.
* @param transportService The Node's transport service.
* @param clusterService The Node's cluster service.
* @param initialEnvironmentSettings The finalized view of settings for the Environment
* @param client The client used to make transport requests
*/
public void initializeServicesAndRestHandler(
RestController restController,
ActionModule actionModule,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings,
NodeClient client
) {
this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService);
this.restActionsRequestHandler = new RestActionsRequestHandler(actionModule.getRestController(), extensionIdMap, transportService);
this.customSettingsRequestHandler = new CustomSettingsRequestHandler(settingsModule);
this.transportService = transportService;
this.clusterService = clusterService;
Expand All @@ -192,7 +195,13 @@ public void initializeServicesAndRestHandler(
REQUEST_EXTENSION_UPDATE_SETTINGS
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client);
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
extensionIdMap,
transportService,
client,
actionModule,
this
);
registerRequestHandler();
}

Expand All @@ -201,6 +210,15 @@ public void initializeServicesAndRestHandler(
*
* @param request which was sent by an extension.
*/
public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception {
return extensionTransportActionsHandler.sendRemoteTransportRequestToExtension(request);
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by OpenSearch or a plugin
*
* @param request which was sent by an extension.
*/
public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception {
return extensionTransportActionsHandler.sendTransportRequestToExtension(request);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionType;

import java.util.Objects;

/**
* An {@link ActionType} to be used in extension action transport handling.
*
* @opensearch.internal
*/
public class ExtensionAction extends ActionType<RemoteExtensionActionResponse> {

private final String uniqueId;

/**
* Create an instance of this action to register in the dynamic actions map.
*
* @param uniqueId The uniqueId of the extension which will run this action.
* @param name The fully qualified class name of the extension's action to execute.
*/
public ExtensionAction(String uniqueId, String name) {
super(name, RemoteExtensionActionResponse::new);
this.uniqueId = uniqueId;
}

/**
* Gets the uniqueId of the extension which will run this action.
*
* @return the uniqueId
*/
public String uniqueId() {
return this.uniqueId;
}

@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + Objects.hash(uniqueId);
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!super.equals(obj)) return false;
if (getClass() != obj.getClass()) return false;
ExtensionAction other = (ExtensionAction) obj;
return Objects.equals(uniqueId, other.uniqueId);
}
}
Loading

0 comments on commit ac79a63

Please sign in to comment.