Skip to content

Commit

Permalink
Revert "[Extensions] Add DynamicActionRegistry to ActionModule (opens…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rishikesh1159 committed Mar 29, 2023
1 parent 715c4c8 commit 306b8f5
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 645 deletions.
92 changes: 3 additions & 89 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,12 @@
import org.opensearch.common.inject.TypeLiteral;
import org.opensearch.common.inject.multibindings.MapBinder;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionTransportAction;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionProxyTransportAction;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -448,15 +448,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;
import static java.util.Objects.requireNonNull;

/**
* Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}.
Expand All @@ -473,17 +471,7 @@ public class ActionModule extends AbstractModule {
private final ClusterSettings clusterSettings;
private final SettingsFilter settingsFilter;
private final List<ActionPlugin> actionPlugins;
// The unmodifiable map containing OpenSearch and Plugin actions
// This is initialized at node bootstrap and contains same-JVM actions
// It will be wrapped in the Dynamic Action Registry but otherwise
// remains unchanged from its prior purpose, and registered actions
// will remain accessible.
private final Map<String, ActionHandler<?, ?>> actions;
// A dynamic action registry which includes the above immutable actions
// and also registers dynamic actions which may be unregistered. Usually
// associated with remote action execution on extensions, possibly in
// a different JVM and possibly on a different server.
private final DynamicActionRegistry dynamicActionRegistry;
private final ActionFilters actionFilters;
private final AutoCreateIndex autoCreateIndex;
private final DestructiveOperations destructiveOperations;
Expand Down Expand Up @@ -514,7 +502,6 @@ public ActionModule(
this.threadPool = threadPool;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
dynamicActionRegistry = new DynamicActionRegistry();
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<RestHeaderDefinition> headers = Stream.concat(
Expand Down Expand Up @@ -724,7 +711,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
// ExtensionProxyAction
actions.register(ExtensionProxyAction.INSTANCE, ExtensionProxyTransportAction.class);
actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class);
}

// Decommission actions
Expand Down Expand Up @@ -966,86 +953,13 @@ protected void configure() {
bind(supportAction).asEagerSingleton();
}
}

// register dynamic ActionType -> transportAction Map used by NodeClient
bind(DynamicActionRegistry.class).toInstance(dynamicActionRegistry);
}

public ActionFilters getActionFilters() {
return actionFilters;
}

public DynamicActionRegistry getDynamicActionRegistry() {
return dynamicActionRegistry;
}

public RestController getRestController() {
return restController;
}

/**
* The DynamicActionRegistry maintains a registry mapping {@link ActionType} instances to {@link TransportAction} instances.
* <p>
* This class is modeled after {@link NamedRegistry} but provides both register and unregister capabilities.
*
* @opensearch.internal
*/
public static class DynamicActionRegistry {
// This is the unmodifiable actions map created during node bootstrap, which
// will continue to link ActionType and TransportAction pairs from core and plugin
// action handler registration.
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
// A dynamic registry to add or remove ActionType / TransportAction pairs
// at times other than node bootstrap.
private final Map<ActionType<?>, TransportAction<?, ?>> registry = new ConcurrentHashMap<>();

/**
* Register the immutable actions in the registry.
*
* @param actions The injected map of {@link ActionType} to {@link TransportAction}
*/
public void registerUnmodifiableActionMap(Map<ActionType, TransportAction> actions) {
this.actions = actions;
}

/**
* Add a dynamic action to the registry.
*
* @param action The action instance to add
* @param transportAction The corresponding instance of transportAction to execute
*/
public void registerDynamicAction(ActionType<?> action, TransportAction<?, ?> transportAction) {
requireNonNull(action, "action is required");
requireNonNull(transportAction, "transportAction is required");
if (actions.containsKey(action) || registry.putIfAbsent(action, transportAction) != null) {
throw new IllegalArgumentException("action [" + action.name() + "] already registered");
}
}

/**
* Remove a dynamic action from the registry.
*
* @param action The action to remove
*/
public void unregisterDynamicAction(ActionType<?> action) {
requireNonNull(action, "action is required");
if (registry.remove(action) == null) {
throw new IllegalArgumentException("action [" + action.name() + "] was not registered");
}
}

/**
* Gets the {@link TransportAction} instance corresponding to the {@link ActionType} instance.
*
* @param action The {@link ActionType}.
* @return the corresponding {@link TransportAction} if it is registered, null otherwise.
*/
@SuppressWarnings("unchecked")
public TransportAction<? extends ActionRequest, ? extends ActionResponse> get(ActionType<?> action) {
if (actions.containsKey(action)) {
return actions.get(action);
}
return registry.get(action);
}
}
}
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/client/node/NodeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.opensearch.action.ActionType;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
Expand All @@ -48,6 +47,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterService;

import java.util.Map;
import java.util.function.Supplier;

/**
Expand All @@ -57,7 +57,7 @@
*/
public class NodeClient extends AbstractClient {

private DynamicActionRegistry actionRegistry;
private Map<ActionType, TransportAction> actions;
/**
* The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by
* {@link #executeLocally(ActionType, ActionRequest, TaskListener)}.
Expand All @@ -71,12 +71,12 @@ public NodeClient(Settings settings, ThreadPool threadPool) {
}

public void initialize(
DynamicActionRegistry actionRegistry,
Map<ActionType, TransportAction> actions,
Supplier<String> localNodeId,
RemoteClusterService remoteClusterService,
NamedWriteableRegistry namedWriteableRegistry
) {
this.actionRegistry = actionRegistry;
this.actions = actions;
this.localNodeId = localNodeId;
this.remoteClusterService = remoteClusterService;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down Expand Up @@ -137,10 +137,10 @@ public String getLocalNodeId() {
private <Request extends ActionRequest, Response extends ActionResponse> TransportAction<Request, Response> transportAction(
ActionType<Response> action
) {
if (actionRegistry == null) {
if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
TransportAction<Request, Response> transportAction = (TransportAction<Request, Response>) actionRegistry.get(action);
TransportAction<Request, Response> transportAction = actions.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterSettingsResponse;
Expand All @@ -48,8 +47,6 @@
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.action.ExtensionActionResponse;
import org.opensearch.extensions.action.ExtensionTransportActionsHandler;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.extensions.rest.RegisterRestActionsRequest;
import org.opensearch.extensions.rest.RestActionsRequestHandler;
Expand All @@ -61,6 +58,7 @@
import org.opensearch.index.IndicesModuleResponse;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.rest.RestController;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -91,7 +89,6 @@ public class ExtensionsManager {
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";
public static final String REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION = "internal:extensions/handle-transportaction";
public static final String REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION = "internal:extensions/handle-remote-transportaction";
public static final String TRANSPORT_ACTION_REQUEST_FROM_EXTENSION = "internal:extensions/request-transportaction-from-extension";
public static final int EXTENSION_REQUEST_WAIT_TIMEOUT = 10;

Expand Down Expand Up @@ -169,22 +166,22 @@ public ExtensionsManager(Settings settings, Path extensionsPath) throws IOExcept
* Initializes the {@link RestActionsRequestHandler}, {@link TransportService}, {@link ClusterService} and environment settings. This is called during Node bootstrap.
* Lists/maps of extensions have already been initialized but not yet populated.
*
* @param actionModule The ActionModule with the RestController and DynamicActionModule
* @param restController The RestController on which to register Rest Actions.
* @param settingsModule The module that binds the provided settings to interface.
* @param transportService The Node's transport service.
* @param clusterService The Node's cluster service.
* @param initialEnvironmentSettings The finalized view of settings for the Environment
* @param client The client used to make transport requests
*/
public void initializeServicesAndRestHandler(
ActionModule actionModule,
RestController restController,
SettingsModule settingsModule,
TransportService transportService,
ClusterService clusterService,
Settings initialEnvironmentSettings,
NodeClient client
) {
this.restActionsRequestHandler = new RestActionsRequestHandler(actionModule.getRestController(), extensionIdMap, transportService);
this.restActionsRequestHandler = new RestActionsRequestHandler(restController, extensionIdMap, transportService);
this.customSettingsRequestHandler = new CustomSettingsRequestHandler(settingsModule);
this.transportService = transportService;
this.clusterService = clusterService;
Expand All @@ -195,13 +192,7 @@ public void initializeServicesAndRestHandler(
REQUEST_EXTENSION_UPDATE_SETTINGS
);
this.client = client;
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
extensionIdMap,
transportService,
client,
actionModule,
this
);
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(extensionIdMap, transportService, client);
registerRequestHandler();
}

Expand All @@ -210,15 +201,6 @@ public void initializeServicesAndRestHandler(
*
* @param request which was sent by an extension.
*/
public RemoteExtensionActionResponse handleRemoteTransportRequest(ExtensionActionRequest request) throws Exception {
return extensionTransportActionsHandler.sendRemoteTransportRequestToExtension(request);
}

/**
* Handles Transport Request from {@link org.opensearch.extensions.action.ExtensionTransportAction} which was invoked by OpenSearch or a plugin
*
* @param request which was sent by an extension.
*/
public ExtensionActionResponse handleTransportRequest(ExtensionActionRequest request) throws Exception {
return extensionTransportActionsHandler.sendTransportRequestToExtension(request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.extensions.action;
package org.opensearch.extensions;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand Down

This file was deleted.

Loading

0 comments on commit 306b8f5

Please sign in to comment.