Skip to content

Commit

Permalink
Fix response handling, it works (again)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Mar 23, 2023
1 parent c2a57d7 commit 8b2f179
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 46 deletions.
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.extensions.action.ExtensionProxyAction;
import org.opensearch.extensions.action.ExtensionTransportAction;
import org.opensearch.extensions.action.ExtensionProxyTransportAction;
import org.opensearch.index.seqno.RetentionLeaseActions;
import org.opensearch.indices.SystemIndices;
import org.opensearch.indices.breaker.CircuitBreakerService;
Expand Down Expand Up @@ -724,7 +724,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
// ExtensionProxyAction
actions.register(ExtensionProxyAction.INSTANCE, ExtensionTransportAction.class);
actions.register(ExtensionProxyAction.INSTANCE, ExtensionProxyTransportAction.class);
}

// Decommission actions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,8 @@ public void initializeServicesAndRestHandler(
this.extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
extensionIdMap,
transportService,
clusterService,
client,
actionModule,
settingsModule,
this
);
registerRequestHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* @opensearch.internal
*/
public class ExtensionAction extends ActionType<ExtensionActionResponse> {
public class ExtensionAction extends ActionType<RemoteExtensionActionResponse> {

private final String uniqueId;

Expand All @@ -28,7 +28,7 @@ public class ExtensionAction extends ActionType<ExtensionActionResponse> {
* @param name The fully qualified class name of the extension's action to execute.
*/
public ExtensionAction(String uniqueId, String name) {
super(name, ExtensionActionResponse::new);
super(name, RemoteExtensionActionResponse::new);
this.uniqueId = uniqueId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions.action;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* A proxy transport action used to proxy a transport request from OpenSearch or a plugin to execute on an extension
*
* @opensearch.internal
*/
public class ExtensionProxyTransportAction extends HandledTransportAction<ExtensionActionRequest, ExtensionActionResponse> {

private final ExtensionsManager extensionsManager;

@Inject
public ExtensionProxyTransportAction(
Settings settings,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
ExtensionsManager extensionsManager
) {
super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new);
this.extensionsManager = extensionsManager;
}

@Override
protected void doExecute(Task task, ExtensionActionRequest request, ActionListener<ExtensionActionResponse> listener) {
try {
listener.onResponse(extensionsManager.handleTransportRequest(request));
} catch (Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,34 @@

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.action.support.TransportAction;
import org.opensearch.extensions.ExtensionsManager;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
import org.opensearch.tasks.TaskManager;

/**
* A proxy transport action used to proxy a transport request from OpenSearch or a plugin to execute on an extension
*
* @opensearch.internal
*/
public class ExtensionTransportAction extends HandledTransportAction<ExtensionActionRequest, ExtensionActionResponse> {
public class ExtensionTransportAction extends TransportAction<ExtensionActionRequest, RemoteExtensionActionResponse> {

private final ExtensionsManager extensionsManager;

@Inject
public ExtensionTransportAction(
Settings settings,
TransportService transportService,
String actionName,
ActionFilters actionFilters,
ClusterService clusterService,
TaskManager taskManager,
ExtensionsManager extensionsManager
) {
super(ExtensionProxyAction.NAME, transportService, actionFilters, ExtensionActionRequest::new);
super(actionName, actionFilters, taskManager);
this.extensionsManager = extensionsManager;
}

@Override
protected void doExecute(Task task, ExtensionActionRequest request, ActionListener<ExtensionActionResponse> listener) {
protected void doExecute(Task task, ExtensionActionRequest request, ActionListener<RemoteExtensionActionResponse> listener) {
try {
listener.onResponse(extensionsManager.handleTransportRequest(request));
listener.onResponse(extensionsManager.handleRemoteTransportRequest(request));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import org.opensearch.action.ActionModule.DynamicActionRegistry;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.extensions.DiscoveryExtensionNode;
import org.opensearch.extensions.AcknowledgedResponse;
import org.opensearch.extensions.ExtensionsManager;
Expand Down Expand Up @@ -48,29 +46,23 @@ public class ExtensionTransportActionsHandler {
// Map of Extension unique ID to Extension Node, populated in Extensions Manager
private final Map<String, DiscoveryExtensionNode> extensionIdMap;
private final TransportService transportService;
private final ClusterService clusterService;
private final NodeClient client;
private final ActionFilters actionFilters;
private final DynamicActionRegistry dynamicActionRegistry;
private final SettingsModule settingsModule;
private final ExtensionsManager extensionsManager;

public ExtensionTransportActionsHandler(
Map<String, DiscoveryExtensionNode> extensionIdMap,
TransportService transportService,
ClusterService clusterService,
NodeClient client,
ActionModule actionModule,
SettingsModule settingsModule,
ExtensionsManager extensionsManager
) {
this.extensionIdMap = extensionIdMap;
this.transportService = transportService;
this.clusterService = clusterService;
this.client = client;
this.actionFilters = actionModule.getActionFilters();
this.dynamicActionRegistry = actionModule.getDynamicActionRegistry();
this.settingsModule = settingsModule;
this.extensionsManager = extensionsManager;
}

Expand All @@ -89,7 +81,7 @@ void registerAction(String action, String uniqueId) throws IllegalArgumentExcept
// Register the action in the action module's dynamic actions map
dynamicActionRegistry.registerDynamicAction(
new ExtensionAction(uniqueId, action),
new ExtensionTransportAction(settingsModule.getSettings(), transportService, actionFilters, clusterService, extensionsManager)
new ExtensionTransportAction(action, actionFilters, transportService.getTaskManager(), extensionsManager)
);
}

Expand All @@ -104,6 +96,7 @@ public DiscoveryExtensionNode getExtension(String action) {
if (uniqueId == null) {
throw new ActionNotFoundTransportException(action);
}
logger.info("Got extension: " + extensionIdMap.get(uniqueId));
return extensionIdMap.get(uniqueId);
}

Expand All @@ -117,10 +110,11 @@ public TransportResponse handleRegisterTransportActionsRequest(RegisterTransport
logger.debug("Register Transport Actions request recieved {}", transportActionsRequest);
try {
for (String action : transportActionsRequest.getTransportActions()) {
logger.info("Registering action " + action + " with id " + transportActionsRequest.getUniqueId());
registerAction(action, transportActionsRequest.getUniqueId());
}
} catch (Exception e) {
logger.error("Could not register Transport Action " + e);
logger.error("Could not register Transport Action: " + e.getMessage());
return new AcknowledgedResponse(false);
}
return new AcknowledgedResponse(true);
Expand Down Expand Up @@ -160,10 +154,12 @@ public RemoteExtensionActionResponse handleTransportActionRequestFromExtension(T
client.execute(
extensionAction,
new ExtensionActionRequest(request.getAction(), request.getRequestBytes()),
new ActionListener<ExtensionActionResponse>() {
new ActionListener<RemoteExtensionActionResponse>() {
@Override
public void onResponse(ExtensionActionResponse actionResponse) {
inProgressFuture.complete(new RemoteExtensionActionResponse(actionResponse));
public void onResponse(RemoteExtensionActionResponse actionResponse) {
response.setSuccess(actionResponse.isSuccess());
response.setResponseBytes(actionResponse.getResponseBytes());
inProgressFuture.complete(actionResponse);
}

@Override
Expand Down Expand Up @@ -274,7 +270,7 @@ public RemoteExtensionActionResponse read(StreamInput in) throws IOException {

@Override
public void handleResponse(RemoteExtensionActionResponse response) {
extensionActionResponse.setSuccess(true);
extensionActionResponse.setSuccess(response.isSuccess());
extensionActionResponse.setResponseBytes(response.getResponseBytes());
inProgressFuture.complete(response);
}
Expand All @@ -294,7 +290,7 @@ public String executor() {
try {
transportService.sendRequest(
extension,
ExtensionsManager.REQUEST_EXTENSION_HANDLE_TRANSPORT_ACTION,
ExtensionsManager.REQUEST_EXTENSION_HANDLE_REMOTE_TRANSPORT_ACTION,
new ExtensionHandleTransportRequest(request.getAction(), request.getRequestBytes()),
extensionActionResponseTransportResponseHandler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,7 @@ public void testDynamicActionRegistry() {

// ExtensionsAction not yet registered
ExtensionAction testExtensionAction = new ExtensionAction("extensionId", "actionName");
ExtensionTransportAction testExtensionTransportAction = new ExtensionTransportAction(
Settings.EMPTY,
null,
emptyFilters,
null,
null
);
ExtensionTransportAction testExtensionTransportAction = new ExtensionTransportAction("test-action", emptyFilters, null, null);
assertNull(dynamicActionRegistry.get(testExtensionAction));

// Register an extension action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.extensions.DiscoveryExtensionNode;
Expand Down Expand Up @@ -102,15 +101,11 @@ public void setup() throws Exception {
dynamicActionRegistry.registerUnmodifiableActionMap(Collections.emptyMap());
when(mockActionModule.getDynamicActionRegistry()).thenReturn(dynamicActionRegistry);
when(mockActionModule.getActionFilters()).thenReturn(EMPTY_FILTERS);
SettingsModule mockSettingsModule = mock(SettingsModule.class);
when(mockSettingsModule.getSettings()).thenReturn(Settings.EMPTY);
extensionTransportActionsHandler = new ExtensionTransportActionsHandler(
Map.of("uniqueid1", discoveryExtensionNode),
transportService,
null,
client,
mockActionModule,
mockSettingsModule,
null
);
}
Expand Down

0 comments on commit 8b2f179

Please sign in to comment.