Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute remote actions on another extension #588

Merged
merged 21 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ The `ExtensionsManager` reads a list of extensions present in `extensions.yml`.

(27) The User receives the response.

#### Remote Action Execution on another Extension

Extensions may invoke actions on other extensions using the `ProxyAction` and `ProxyActionRequest`. The code sequence is shown below.

![](Docs/RemoteActionExecution.svg)

#### Extension Point Implementation Walk Through

An example of a more complex extension point, `getNamedXContent()` is shown below. A similar pattern can be followed for most extension points.
Expand Down
1 change: 1 addition & 0 deletions Docs/RemoteActionExecution.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 5 additions & 3 deletions src/main/java/org/opensearch/sdk/BaseExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@

import java.io.IOException;

import com.google.inject.Inject;

/**
* An abstract class that simplifies extension initialization and provides an instance of the runner.
*/
public abstract class BaseExtension implements Extension {
/**
* The {@link ExtensionsRunner} instance running this extension
*/
@Inject
private ExtensionsRunner extensionsRunner;

/**
Expand Down Expand Up @@ -56,6 +53,11 @@ public ExtensionSettings getExtensionSettings() {
return this.settings;
}

@Override
public void setExtensionsRunner(ExtensionsRunner runner) {
this.extensionsRunner = runner;
}

/**
* Gets the {@link ExtensionsRunner} of this extension.
*
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/sdk/Extension.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
*/
public interface Extension {

/**
* Set the instance of {@link ExtensionsRunner} for this extension.
*
* @param runner The ExtensionsRunner instance.
*/
public void setExtensionsRunner(ExtensionsRunner runner);

/**
* Gets the {@link ExtensionSettings} of this extension.
*
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.extensions.DiscoveryExtensionNode;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.action.ExtensionActionRequest;
import org.opensearch.extensions.ExtensionsManager.RequestType;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsManager;
Expand All @@ -33,6 +34,7 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionRequestHandler;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionDependencyResponseHandler;
Expand Down Expand Up @@ -132,13 +134,15 @@ public class ExtensionsRunner {
private final SDKNamedXContentRegistry sdkNamedXContentRegistry;
private final SDKClient sdkClient;
private final SDKClusterService sdkClusterService;
private final SDKTransportService sdkTransportService;
private final SDKActionModule sdkActionModule;

private ExtensionsInitRequestHandler extensionsInitRequestHandler = new ExtensionsInitRequestHandler(this);
private ExtensionsIndicesModuleRequestHandler extensionsIndicesModuleRequestHandler = new ExtensionsIndicesModuleRequestHandler();
private ExtensionsIndicesModuleNameRequestHandler extensionsIndicesModuleNameRequestHandler =
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);
private ExtensionActionRequestHandler extensionsActionRequestHandler;
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Instantiates a new update settings request handler
Expand All @@ -152,7 +156,10 @@ public class ExtensionsRunner {
* @throws IOException if the runner failed to read settings or API.
*/
protected ExtensionsRunner(Extension extension) throws IOException {
// Link these classes together
this.extension = extension;
extension.setExtensionsRunner(this);

// Initialize concrete classes needed by extensions
// These must have getters from this class to be accessible via createComponents
// If they require later initialization, create a concrete wrapper class and update the internals
Expand All @@ -175,6 +182,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
this.sdkClient = new SDKClient(extensionSettings);
// initialize SDKClusterService. Must happen after extension field assigned
this.sdkClusterService = new SDKClusterService(this);
// initialize SDKTransportService. Must happen after extension field assigned
this.sdkTransportService = new SDKTransportService();

// Create Guice modules for injection
List<com.google.inject.Module> modules = new ArrayList<>();
Expand All @@ -189,6 +198,7 @@ protected ExtensionsRunner(Extension extension) throws IOException {

b.bind(SDKClient.class).toInstance(getSdkClient());
b.bind(SDKClusterService.class).toInstance(getSdkClusterService());
b.bind(SDKTransportService.class).toInstance(getSdkTransportService());
});
// Bind the return values from create components
modules.add(this::injectComponents);
Expand All @@ -202,6 +212,8 @@ protected ExtensionsRunner(Extension extension) throws IOException {
// initialize SDKClient action map
initializeSdkClient();

extensionsActionRequestHandler = new ExtensionActionRequestHandler(getSdkClient(), getSdkActionModule());

if (extension instanceof ActionExtension) {
// store REST handlers in the registry
for (ExtensionRestHandler extensionRestHandler : ((ActionExtension) extension).getExtensionRestHandlers()) {
Expand Down Expand Up @@ -391,6 +403,17 @@ public void startTransportService(TransportService transportService) {
((request, channel, task) -> channel.sendResponse(updateSettingsRequestHandler.handleUpdateSettingsRequest(request)))
);

// TODO: This handles a remote extension request sending a RemoteExtensionActionResponse
// For actions sent from OpenSearch or a plugin using ProxyAction need to write a new request handler
// for ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION that sends an ExtensionActionResponse
transportService.registerRequestHandler(
ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionActionRequest::new,
((request, channel, task) -> channel.sendResponse(extensionsActionRequestHandler.handleExtensionActionRequest(request)))
);
}

/**
Expand Down Expand Up @@ -638,6 +661,10 @@ public TransportService getExtensionTransportService() {
return extensionTransportService;
}

public SDKTransportService getSdkTransportService() {
return sdkTransportService;
}

/**
* Starts an ActionListener.
*
Expand All @@ -660,6 +687,8 @@ public static void run(Extension extension) throws IOException {
// initialize the transport service
NettyTransport nettyTransport = new NettyTransport(runner);
runner.extensionTransportService = nettyTransport.initializeExtensionTransportService(runner.getSettings(), runner.getThreadPool());
// TODO: merge above line with below line when refactoring out extensionTransportService
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
runner.getSdkTransportService().setTransportService(runner.extensionTransportService);
runner.startActionListener(0);
}

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
Expand Down Expand Up @@ -100,6 +101,9 @@ public SDKClient(ExtensionSettings extensionSettings) {
// Used by client.execute, populated by initialize method
@SuppressWarnings("rawtypes")
private Map<ActionType, TransportAction> actions = Collections.emptyMap();
// Used by remote client execution where we get a string for the class name
@SuppressWarnings("rawtypes")
private Map<String, ActionType> actionClassToInstanceMap = Collections.emptyMap();

/**
* Initialize this client.
Expand All @@ -109,6 +113,7 @@ public SDKClient(ExtensionSettings extensionSettings) {
@SuppressWarnings("rawtypes")
public void initialize(Map<ActionType, TransportAction> actions) {
this.actions = actions;
this.actionClassToInstanceMap = actions.keySet().stream().collect(Collectors.toMap(a -> a.getClass().getName(), a -> a));
}

/**
Expand Down Expand Up @@ -285,6 +290,17 @@ public void close() throws IOException {
doCloseHighLevelClient();
}

/**
* Gets an instance of {@link ActionType} from its corresponding class name, suitable for using as the first parameter in {@link #execute(ActionType, ActionRequest, ActionListener)}.
*
* @param className The class name of the action type
* @return The instance corresponding to the class name
*/
@SuppressWarnings("unchecked")
public ActionType<? extends ActionResponse> getActionFromClassName(String className) {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
return actionClassToInstanceMap.get(className);
}

/**
* Executes a generic action, denoted by an {@link ActionType}.
*
Expand Down
132 changes: 132 additions & 0 deletions src/main/java/org/opensearch/sdk/SDKTransportService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.extensions.action.RegisterTransportActionsRequest;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;
import org.opensearch.extensions.action.TransportActionRequestFromExtension;
import org.opensearch.sdk.ActionExtension.ActionHandler;
import org.opensearch.sdk.action.RemoteExtensionActionRequest;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionActionResponseHandler;
import org.opensearch.transport.TransportService;

/**
* Wrapper class for {@link TransportService} and associated methods.
*
* TODO: Move all the sendFooRequest() methods here
* TODO: Replace usages of getExtensionTransportService with this class
* https://github.com/opensearch-project/opensearch-sdk-java/issues/585
*/
public class SDKTransportService {
private final Logger logger = LogManager.getLogger(SDKTransportService.class);

private TransportService transportService;
private DiscoveryNode opensearchNode;
private String uniqueId;

/**
* Requests that OpenSearch register the Transport Actions for this extension.
*
* @param actions The map of registered actions from {@link SDKActionModule#getActions()}
*/
public void sendRegisterTransportActionsRequest(Map<String, ActionHandler<?, ?>> actions) {
logger.info("Sending Register Transport Actions request to OpenSearch");
Set<String> actionNameSet = actions.values()
.stream()
.filter(h -> !h.getAction().name().startsWith("internal"))
.map(h -> h.getAction().getClass().getName())
.collect(Collectors.toSet());
AcknowledgedResponseHandler registerTransportActionsResponseHandler = new AcknowledgedResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
new RegisterTransportActionsRequest(uniqueId, actionNameSet),
registerTransportActionsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Register Transport Actions request to OpenSearch", e);
}
}

/**
* Requests that OpenSearch execute a Transport Actions on another extension.
*
* @param request The request to send
* @return A buffer serializing the response from the remote action if successful, otherwise null
*/
public RemoteExtensionActionResponse sendRemoteExtensionActionRequest(RemoteExtensionActionRequest request) {
logger.info("Sending Remote Extension Action request to OpenSearch for [" + request.getAction() + "]");
// Combine class name string and request bytes
byte[] requestClassBytes = request.getRequestClass().getBytes(StandardCharsets.UTF_8);
byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + request.getRequestBytes().length)
.put(requestClassBytes)
.put((byte) 0)
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
.put(request.getRequestBytes())
.array();
ExtensionActionResponseHandler extensionActionResponseHandler = new ExtensionActionResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsManager.TRANSPORT_ACTION_REQUEST_FROM_EXTENSION,
new TransportActionRequestFromExtension(request.getAction(), proxyRequestBytes, uniqueId),
extensionActionResponseHandler
);
// Wait on response
extensionActionResponseHandler.awaitResponse();
} catch (TimeoutException e) {
logger.info("Failed to receive Remote Extension Action response from OpenSearch", e);
} catch (Exception e) {
logger.info("Failed to send Remote Extension Action request to OpenSearch", e);
}
// At this point, response handler has read in the response bytes
return new RemoteExtensionActionResponse(
extensionActionResponseHandler.isSuccess(),
extensionActionResponseHandler.getResponseBytes()
);
}

public TransportService getTransportService() {
return transportService;
}

public DiscoveryNode getOpensearchNode() {
return opensearchNode;
}

public String getUniqueId() {
return uniqueId;
}

public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}

public void setOpensearchNode(DiscoveryNode opensearchNode) {
this.opensearchNode = opensearchNode;
}

public void setUniqueId(String uniqueId) {
this.uniqueId = uniqueId;
}
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk.action;

import org.opensearch.action.ActionType;
import org.opensearch.extensions.action.RemoteExtensionActionResponse;

/**
* The {@link ActionType} used as they key for the {@link RemoteExtensionTransportAction}.
*/
public class RemoteExtensionAction extends ActionType<RemoteExtensionActionResponse> {

/**
* The name to look up this action with
*/
public static final String NAME = "internal/extension-proxyaction";
/**
* The singleton instance of this class
*/
public static final RemoteExtensionAction INSTANCE = new RemoteExtensionAction();

private RemoteExtensionAction() {
super(NAME, RemoteExtensionActionResponse::new);
}
}
Loading